Repository: hive Updated Branches: refs/heads/branch-3 649c1c55b -> db8e9b0ef
HIVE-20740 : Remove global lock in ObjectStore.setConf method (Vihang Karajgaonkar, reviewed by Andrew Sherman and Naveen Gangam) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/db8e9b0e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/db8e9b0e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/db8e9b0e Branch: refs/heads/branch-3 Commit: db8e9b0efd058c0a0bd18334f44608c93a84077e Parents: 649c1c5 Author: Vihang Karajgaonkar <vihan...@apache.org> Authored: Thu Nov 29 15:07:42 2018 -0800 Committer: Vihang Karajgaonkar <vihan...@apache.org> Committed: Thu Nov 29 15:13:23 2018 -0800 ---------------------------------------------------------------------- .../hive/ql/parse/TestReplicationScenarios.java | 4 +- .../apache/hive/jdbc/TestJdbcWithMiniHS2.java | 3 +- .../hadoop/hive/ql/session/SessionState.java | 3 +- .../hadoop/hive/metastore/ObjectStore.java | 447 ++------------- .../metastore/PersistenceManagerProvider.java | 537 +++++++++++++++++++ .../hadoop/hive/metastore/TestObjectStore.java | 56 +- 6 files changed, 640 insertions(+), 410 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/db8e9b0e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 4b6bc77..b3a19cb 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; -import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.PersistenceManagerProvider; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -179,7 +179,7 @@ public class TestReplicationScenarios { driverMirror = DriverFactory.newDriver(hconfMirror); metaStoreClientMirror = new HiveMetaStoreClient(hconfMirror); - ObjectStore.setTwoMetastoreTesting(true); + PersistenceManagerProvider.setTwoMetastoreTesting(true); } @AfterClass http://git-wip-us.apache.org/repos/asf/hive/blob/db8e9b0e/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 099b67a..4d048ef 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -65,6 +65,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.PersistenceManagerProvider; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.UDF; @@ -1090,7 +1091,7 @@ public class TestJdbcWithMiniHS2 { NucleusContext nc = null; Map<String, ClassLoaderResolver> cMap; try { - pmf = ObjectStore.class.getDeclaredField("pmf"); + pmf = PersistenceManagerProvider.class.getDeclaredField("pmf"); if (pmf != null) { pmf.setAccessible(true); jdoPmf = (JDOPersistenceManagerFactory) pmf.get(null); http://git-wip-us.apache.org/repos/asf/hive/blob/db8e9b0e/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 6f39b03..844620f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.conf.HiveConfUtil; import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.PersistenceManagerProvider; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -1810,7 +1811,7 @@ public class SessionState { } Class<?> clazz = Class.forName(realStoreImpl); if (ObjectStore.class.isAssignableFrom(clazz)) { - ObjectStore.unCacheDataNucleusClassLoaders(); + PersistenceManagerProvider.clearOutPmfClassLoaderCache(); } } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/db8e9b0e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 545b5bd..59c4d22 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCa import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier; import java.io.IOException; -import java.lang.reflect.Field; import java.net.InetAddress; import java.net.URI; import java.nio.ByteBuffer; @@ -34,7 +33,6 @@ import java.sql.Statement; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -45,29 +43,21 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Properties; import java.util.Set; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; -import javax.jdo.JDOCanRetryException; import javax.jdo.JDODataStoreException; import javax.jdo.JDOException; -import javax.jdo.JDOHelper; import javax.jdo.JDOObjectNotFoundException; import javax.jdo.PersistenceManager; -import javax.jdo.PersistenceManagerFactory; import javax.jdo.Query; import javax.jdo.Transaction; -import javax.jdo.datastore.DataStoreCache; import javax.jdo.datastore.JDOConnection; import javax.jdo.identity.IntIdentity; -import javax.sql.DataSource; import com.google.common.base.Strings; @@ -162,8 +152,6 @@ import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse; import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; -import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; -import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory; import org.apache.hadoop.hive.metastore.metrics.Metrics; import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; import org.apache.hadoop.hive.metastore.model.MCatalog; @@ -217,15 +205,7 @@ import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.metastore.utils.ObjectPair; import org.apache.thrift.TException; -import org.datanucleus.AbstractNucleusContext; -import org.datanucleus.ClassLoaderResolver; -import org.datanucleus.ClassLoaderResolverImpl; -import org.datanucleus.NucleusContext; -import org.datanucleus.api.jdo.JDOPersistenceManager; -import org.datanucleus.api.jdo.JDOPersistenceManagerFactory; import org.datanucleus.store.rdbms.exceptions.MissingTableException; -import org.datanucleus.store.scostore.Store; -import org.datanucleus.util.WeakValueMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -246,14 +226,9 @@ import com.google.common.collect.Sets; * filestore. */ public class ObjectStore implements RawStore, Configurable { - private static Properties prop = null; - private static PersistenceManagerFactory pmf = null; - private static boolean forTwoMetastoreTesting = false; private static final DateTimeFormatter YMDHMS_FORMAT = DateTimeFormatter.ofPattern( "yyyy_MM_dd_HH_mm_ss"); - - private static Lock pmfPropLock = new ReentrantLock(); /** * Verify the schema only once per JVM since the db connection info is static */ @@ -339,65 +314,43 @@ public class ObjectStore implements RawStore, Configurable { @Override @SuppressWarnings("nls") public void setConf(Configuration conf) { - // Although an instance of ObjectStore is accessed by one thread, there may - // be many threads with ObjectStore instances. So the static variables - // pmf and prop need to be protected with locks. - pmfPropLock.lock(); - try { - isInitialized = false; - this.conf = conf; - configureSSL(conf); - Properties propsFromConf = getDataSourceProps(conf); - boolean propsChanged = !propsFromConf.equals(prop); - - if (propsChanged) { - if (pmf != null){ - clearOutPmfClassLoaderCache(pmf); - if (!forTwoMetastoreTesting) { - // close the underlying connection pool to avoid leaks - pmf.close(); - } - } - pmf = null; - prop = null; - } - - assert(!isActiveTransaction()); - shutdown(); - // Always want to re-create pm as we don't know if it were created by the - // most recent instance of the pmf - pm = null; - directSql = null; - expressionProxy = null; - openTrasactionCalls = 0; - currentTransaction = null; - transactionStatus = TXN_STATUS.NO_STATE; - - initialize(propsFromConf); - - String partitionValidationRegex = - MetastoreConf.getVar(this.conf, ConfVars.PARTITION_NAME_WHITELIST_PATTERN); - if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) { - partitionValidationPattern = Pattern.compile(partitionValidationRegex); - } else { - partitionValidationPattern = null; - } + isInitialized = false; + this.conf = conf; + configureSSL(conf); + PersistenceManagerProvider.updatePmfProperties(conf); + + assert (!isActiveTransaction()); + shutdown(); + // Always want to re-create pm as we don't know if it were created by the + // most recent instance of the pmf + pm = null; + directSql = null; + expressionProxy = null; + openTrasactionCalls = 0; + currentTransaction = null; + transactionStatus = TXN_STATUS.NO_STATE; + + initialize(); + + String partitionValidationRegex = + MetastoreConf.getVar(this.conf, ConfVars.PARTITION_NAME_WHITELIST_PATTERN); + if (partitionValidationRegex != null && !partitionValidationRegex.isEmpty()) { + partitionValidationPattern = Pattern.compile(partitionValidationRegex); + } else { + partitionValidationPattern = null; + } - // Note, if metrics have not been initialized this will return null, which means we aren't - // using metrics. Thus we should always check whether this is non-null before using. - MetricRegistry registry = Metrics.getRegistry(); - if (registry != null) { - directSqlErrors = Metrics.getOrCreateCounter(MetricsConstants.DIRECTSQL_ERRORS); - } + // Note, if metrics have not been initialized this will return null, which means we aren't + // using metrics. Thus we should always check whether this is non-null before using. + MetricRegistry registry = Metrics.getRegistry(); + if (registry != null) { + directSqlErrors = Metrics.getOrCreateCounter(MetricsConstants.DIRECTSQL_ERRORS); + } - if (!isInitialized) { - throw new RuntimeException( - "Unable to create persistence manager. Check dss.log for details"); - } else { - LOG.info("Initialized ObjectStore"); - } - } finally { - pmfPropLock.unlock(); + if (!isInitialized) { + throw new RuntimeException("Unable to create persistence manager. Check log for details"); + } else { + LOG.debug("Initialized ObjectStore"); } } @@ -410,81 +363,13 @@ public class ObjectStore implements RawStore, Configurable { } @SuppressWarnings("nls") - private void initialize(Properties dsProps) { - int retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS); - long retryInterval = MetastoreConf.getTimeVar(conf, - ConfVars.HMS_HANDLER_INTERVAL, TimeUnit.MILLISECONDS); - int numTries = retryLimit; - - while (numTries > 0){ - try { - initializeHelper(dsProps); - return; // If we reach here, we succeed. - } catch (Exception e){ - shutdown(); - numTries--; - boolean retriable = isRetriableException(e); - if ((numTries > 0) && retriable){ - LOG.info("Retriable exception while instantiating ObjectStore, retrying. " + - "{} tries left", numTries, e); - try { - Thread.sleep(retryInterval); - } catch (InterruptedException ie) { - // Restore the interrupted status, since we do not want to catch it. - LOG.debug("Interrupted while sleeping before retrying.", ie); - Thread.currentThread().interrupt(); - } - // If we're here, we'll proceed down the next while loop iteration. - } else { - // we've reached our limit, throw the last one. - if (retriable){ - LOG.warn("Exception retry limit reached, not retrying any longer.", - e); - } else { - LOG.debug("Non-retriable exception during ObjectStore initialize.", e); - } - throw e; - } - } - } - } - - private static final Set<Class<? extends Throwable>> retriableExceptionClasses = - new HashSet<>(Arrays.asList(JDOCanRetryException.class)); - /** - * Helper function for initialize to determine if we should retry an exception. - * We return true if the exception is of a known type of retriable exceptions, or if one - * of its recursive .getCause returns a known type of retriable exception. - */ - private boolean isRetriableException(Throwable e) { - if (e == null){ - return false; - } - if (retriableExceptionClasses.contains(e.getClass())){ - return true; - } - for (Class<? extends Throwable> c : retriableExceptionClasses){ - if (c.isInstance(e)){ - return true; - } - } - - if (e.getCause() == null){ - return false; - } - return isRetriableException(e.getCause()); - } - - /** - * private helper to do initialization routine, so we can retry if needed if it fails. - * @param dsProps - */ - private void initializeHelper(Properties dsProps) { - LOG.info("ObjectStore, initialize called"); - prop = dsProps; - pm = getPersistenceManager(); + private void initialize() { + LOG.debug("ObjectStore, initialize called"); + // if this method fails, PersistenceManagerProvider will retry for the configured number of times + // before giving up + pm = PersistenceManagerProvider.getPersistenceManager(); LOG.info("RawStore: {}, with PersistenceManager: {}" + - " created in the thread with id: {}", this, pm, Thread.currentThread().getId()); + " created in the thread with id: {}", this, pm, Thread.currentThread().getId()); try { String productName = MetaStoreDirectSql.getProductName(pm); sqlGenerator = new SQLGenerator(DatabaseProduct.determineDatabaseProduct(productName), conf); @@ -497,7 +382,7 @@ public class ObjectStore implements RawStore, Configurable { dbType = determineDatabaseProduct(); expressionProxy = createExpressionProxy(conf); if (MetastoreConf.getBoolVar(getConf(), ConfVars.TRY_DIRECT_SQL)) { - String schema = prop.getProperty("javax.jdo.mapping.Schema"); + String schema = PersistenceManagerProvider.getProperty("javax.jdo.mapping.Schema"); schema = org.apache.commons.lang.StringUtils.defaultIfBlank(schema, null); directSql = new MetaStoreDirectSql(pm, conf, schema); } @@ -564,135 +449,10 @@ public class ObjectStore implements RawStore, Configurable { } } - /** - * Properties specified in hive-default.xml override the properties specified - * in jpox.properties. - */ - @SuppressWarnings("nls") - private static Properties getDataSourceProps(Configuration conf) { - Properties prop = new Properties(); - correctAutoStartMechanism(conf); - - // First, go through and set all our values for datanucleus and javax.jdo parameters. This - // has to be a separate first step because we don't set the default values in the config object. - for (ConfVars var : MetastoreConf.dataNucleusAndJdoConfs) { - String confVal = MetastoreConf.getAsString(conf, var); - String varName = var.getVarname(); - Object prevVal = prop.setProperty(varName, confVal); - if (MetastoreConf.isPrintable(varName)) { - LOG.debug("Overriding {} value {} from jpox.properties with {}", - varName, prevVal, confVal); - } - } - - // Now, we need to look for any values that the user set that MetastoreConf doesn't know about. - // TODO Commenting this out for now, as it breaks because the conf values aren't getting properly - // interpolated in case of variables. See HIVE-17788. - /* - for (Map.Entry<String, String> e : conf) { - if (e.getKey().startsWith("datanucleus.") || e.getKey().startsWith("javax.jdo.")) { - // We have to handle this differently depending on whether it is a value known to - // MetastoreConf or not. If it is, we need to get the default value if a value isn't - // provided. If not, we just set whatever the user has set. - Object prevVal = prop.setProperty(e.getKey(), e.getValue()); - if (LOG.isDebugEnabled() && MetastoreConf.isPrintable(e.getKey())) { - LOG.debug("Overriding " + e.getKey() + " value " + prevVal - + " from jpox.properties with " + e.getValue()); - } - } - } - */ - - // Password may no longer be in the conf, use getPassword() - try { - String passwd = MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD); - if (org.apache.commons.lang.StringUtils.isNotEmpty(passwd)) { - // We can get away with the use of varname here because varname == hiveName for PWD - prop.setProperty(ConfVars.PWD.getVarname(), passwd); - } - } catch (IOException err) { - throw new RuntimeException("Error getting metastore password: " + err.getMessage(), err); - } - - if (LOG.isDebugEnabled()) { - for (Entry<Object, Object> e : prop.entrySet()) { - if (MetastoreConf.isPrintable(e.getKey().toString())) { - LOG.debug("{} = {}", e.getKey(), e.getValue()); - } - } - } - - return prop; - } - - /** - * Update conf to set datanucleus.autoStartMechanismMode=ignored. - * This is necessary to able to use older version of hive against - * an upgraded but compatible metastore schema in db from new version - * of hive - * @param conf - */ - private static void correctAutoStartMechanism(Configuration conf) { - final String autoStartKey = "datanucleus.autoStartMechanismMode"; - final String autoStartIgnore = "ignored"; - String currentAutoStartVal = conf.get(autoStartKey); - if (!autoStartIgnore.equalsIgnoreCase(currentAutoStartVal)) { - LOG.warn("{} is set to unsupported value {} . Setting it to value: {}", autoStartKey, - conf.get(autoStartKey), autoStartIgnore); - } - conf.set(autoStartKey, autoStartIgnore); - } - - private static synchronized PersistenceManagerFactory getPMF() { - if (pmf == null) { - - Configuration conf = MetastoreConf.newMetastoreConf(); - DataSourceProvider dsp = DataSourceProviderFactory.getDataSourceProvider(conf); - if (dsp == null) { - pmf = JDOHelper.getPersistenceManagerFactory(prop); - } else { - try { - DataSource ds = dsp.create(conf); - Map<Object, Object> dsProperties = new HashMap<>(); - //Any preexisting datanucleus property should be passed along - dsProperties.putAll(prop); - dsProperties.put("datanucleus.ConnectionFactory", ds); - dsProperties.put("javax.jdo.PersistenceManagerFactoryClass", - "org.datanucleus.api.jdo.JDOPersistenceManagerFactory"); - pmf = JDOHelper.getPersistenceManagerFactory(dsProperties); - } catch (SQLException e) { - LOG.warn("Could not create PersistenceManagerFactory using " + - "connection pool properties, will fall back", e); - pmf = JDOHelper.getPersistenceManagerFactory(prop); - } - } - DataStoreCache dsc = pmf.getDataStoreCache(); - if (dsc != null) { - String objTypes = MetastoreConf.getVar(conf, ConfVars.CACHE_PINOBJTYPES); - LOG.info("Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=\"{}\"", objTypes); - if (org.apache.commons.lang.StringUtils.isNotEmpty(objTypes)) { - String[] typeTokens = objTypes.toLowerCase().split(","); - for (String type : typeTokens) { - type = type.trim(); - if (PINCLASSMAP.containsKey(type)) { - dsc.pinAll(true, PINCLASSMAP.get(type)); - } else { - LOG.warn("{} is not one of the pinnable object types: {}", type, - org.apache.commons.lang.StringUtils.join(PINCLASSMAP.keySet(), " ")); - } - } - } - } else { - LOG.warn("PersistenceManagerFactory returned null DataStoreCache object. Unable to initialize object pin types defined by hive.metastore.cache.pinobjtypes"); - } - } - return pmf; - } - @InterfaceAudience.LimitedPrivate({"HCATALOG"}) @InterfaceStability.Evolving public PersistenceManager getPersistenceManager() { - return getPMF().getPersistenceManager(); + return PersistenceManagerProvider.getPersistenceManager(); } @Override @@ -9759,113 +9519,6 @@ public class ObjectStore implements RawStore, Configurable { throw new UnsupportedOperationException(); } - /** - * Removed cached classloaders from DataNucleus - * DataNucleus caches classloaders in NucleusContext. - * In UDFs, this can result in classloaders not getting GCed resulting in PermGen leaks. - * This is particularly an issue when using embedded metastore with HiveServer2, - * since the current classloader gets modified with each new add jar, - * becoming the classloader for downstream classes, which DataNucleus ends up using. - * The NucleusContext cache gets freed up only on calling a close on it. - * We're not closing NucleusContext since it does a bunch of other things which we don't want. - * We're not clearing the cache HashMap by calling HashMap#clear to avoid concurrency issues. - */ - public static void unCacheDataNucleusClassLoaders() { - PersistenceManagerFactory pmf = ObjectStore.getPMF(); - clearOutPmfClassLoaderCache(pmf); - } - - private static void clearOutPmfClassLoaderCache(PersistenceManagerFactory pmf) { - if ((pmf == null) || (!(pmf instanceof JDOPersistenceManagerFactory))) { - return; - } - // NOTE : This is hacky, and this section of code is fragile depending on DN code varnames - // so it's likely to stop working at some time in the future, especially if we upgrade DN - // versions, so we actively need to find a better way to make sure the leak doesn't happen - // instead of just clearing out the cache after every call. - JDOPersistenceManagerFactory jdoPmf = (JDOPersistenceManagerFactory) pmf; - NucleusContext nc = jdoPmf.getNucleusContext(); - try { - Field pmCache = pmf.getClass().getDeclaredField("pmCache"); - pmCache.setAccessible(true); - Set<JDOPersistenceManager> pmSet = (Set<JDOPersistenceManager>)pmCache.get(pmf); - for (JDOPersistenceManager pm : pmSet) { - org.datanucleus.ExecutionContext ec = pm.getExecutionContext(); - if (ec instanceof org.datanucleus.ExecutionContextThreadedImpl) { - ClassLoaderResolver clr = ((org.datanucleus.ExecutionContextThreadedImpl)ec).getClassLoaderResolver(); - clearClr(clr); - } - } - org.datanucleus.plugin.PluginManager pluginManager = jdoPmf.getNucleusContext().getPluginManager(); - Field registryField = pluginManager.getClass().getDeclaredField("registry"); - registryField.setAccessible(true); - org.datanucleus.plugin.PluginRegistry registry = (org.datanucleus.plugin.PluginRegistry)registryField.get(pluginManager); - if (registry instanceof org.datanucleus.plugin.NonManagedPluginRegistry) { - org.datanucleus.plugin.NonManagedPluginRegistry nRegistry = (org.datanucleus.plugin.NonManagedPluginRegistry)registry; - Field clrField = nRegistry.getClass().getDeclaredField("clr"); - clrField.setAccessible(true); - ClassLoaderResolver clr = (ClassLoaderResolver)clrField.get(nRegistry); - clearClr(clr); - } - if (nc instanceof org.datanucleus.PersistenceNucleusContextImpl) { - org.datanucleus.PersistenceNucleusContextImpl pnc = (org.datanucleus.PersistenceNucleusContextImpl)nc; - org.datanucleus.store.types.TypeManagerImpl tm = (org.datanucleus.store.types.TypeManagerImpl)pnc.getTypeManager(); - Field clrField = tm.getClass().getDeclaredField("clr"); - clrField.setAccessible(true); - ClassLoaderResolver clr = (ClassLoaderResolver)clrField.get(tm); - clearClr(clr); - Field storeMgrField = pnc.getClass().getDeclaredField("storeMgr"); - storeMgrField.setAccessible(true); - org.datanucleus.store.rdbms.RDBMSStoreManager storeMgr = (org.datanucleus.store.rdbms.RDBMSStoreManager)storeMgrField.get(pnc); - Field backingStoreField = storeMgr.getClass().getDeclaredField("backingStoreByMemberName"); - backingStoreField.setAccessible(true); - Map<String, Store> backingStoreByMemberName = (Map<String, Store>)backingStoreField.get(storeMgr); - for (Store store : backingStoreByMemberName.values()) { - org.datanucleus.store.rdbms.scostore.BaseContainerStore baseStore = (org.datanucleus.store.rdbms.scostore.BaseContainerStore)store; - clrField = org.datanucleus.store.rdbms.scostore.BaseContainerStore.class.getDeclaredField("clr"); - clrField.setAccessible(true); - clr = (ClassLoaderResolver)clrField.get(baseStore); - clearClr(clr); - } - } - Field classLoaderResolverMap = AbstractNucleusContext.class.getDeclaredField( - "classLoaderResolverMap"); - classLoaderResolverMap.setAccessible(true); - Map<String,ClassLoaderResolver> loaderMap = - (Map<String, ClassLoaderResolver>) classLoaderResolverMap.get(nc); - for (ClassLoaderResolver clr : loaderMap.values()){ - clearClr(clr); - } - classLoaderResolverMap.set(nc, new HashMap<String, ClassLoaderResolver>()); - LOG.debug("Removed cached classloaders from DataNucleus NucleusContext"); - } catch (Exception e) { - LOG.warn("Failed to remove cached classloaders from DataNucleus NucleusContext", e); - } - } - - private static void clearClr(ClassLoaderResolver clr) throws Exception { - if (clr != null){ - if (clr instanceof ClassLoaderResolverImpl){ - ClassLoaderResolverImpl clri = (ClassLoaderResolverImpl) clr; - long resourcesCleared = clearFieldMap(clri,"resources"); - long loadedClassesCleared = clearFieldMap(clri,"loadedClasses"); - long unloadedClassesCleared = clearFieldMap(clri, "unloadedClasses"); - LOG.debug("Cleared ClassLoaderResolverImpl: {}, {}, {}", - resourcesCleared, loadedClassesCleared, unloadedClassesCleared); - } - } - } - private static long clearFieldMap(ClassLoaderResolverImpl clri, String mapFieldName) throws Exception { - Field mapField = ClassLoaderResolverImpl.class.getDeclaredField(mapFieldName); - mapField.setAccessible(true); - - Map<String,Class> map = (Map<String, Class>) mapField.get(clri); - long sz = map.size(); - mapField.set(clri, Collections.synchronizedMap(new WeakValueMap())); - return sz; - } - - @Override public List<SQLPrimaryKey> getPrimaryKeys(String catName, String db_name, String tbl_name) throws MetaException { @@ -10927,20 +10580,6 @@ public class ObjectStore implements RawStore, Configurable { } } - /** - * To make possible to run multiple metastore in unit test - * @param twoMetastoreTesting if we are using multiple metastore in unit test - */ - @VisibleForTesting - public static void setTwoMetastoreTesting(boolean twoMetastoreTesting) { - forTwoMetastoreTesting = twoMetastoreTesting; - } - - @VisibleForTesting - Properties getProp() { - return prop; - } - private void checkForConstraintException(Exception e, String msg) throws AlreadyExistsException { if (getConstraintException(e) != null) { LOG.error(msg, e); http://git-wip-us.apache.org/repos/asf/hive/blob/db8e9b0e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java new file mode 100644 index 0000000..20f0738 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java @@ -0,0 +1,537 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider; +import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory; +import org.apache.hadoop.hive.metastore.model.MDatabase; +import org.apache.hadoop.hive.metastore.model.MFieldSchema; +import org.apache.hadoop.hive.metastore.model.MOrder; +import org.apache.hadoop.hive.metastore.model.MPartition; +import org.apache.hadoop.hive.metastore.model.MSerDeInfo; +import org.apache.hadoop.hive.metastore.model.MStorageDescriptor; +import org.apache.hadoop.hive.metastore.model.MTable; +import org.apache.hadoop.hive.metastore.model.MType; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.datanucleus.AbstractNucleusContext; +import org.datanucleus.ClassLoaderResolver; +import org.datanucleus.ClassLoaderResolverImpl; +import org.datanucleus.NucleusContext; +import org.datanucleus.PropertyNames; +import org.datanucleus.api.jdo.JDOPersistenceManager; +import org.datanucleus.api.jdo.JDOPersistenceManagerFactory; +import org.datanucleus.store.scostore.Store; +import org.datanucleus.util.WeakValueMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jdo.JDOCanRetryException; +import javax.jdo.JDOHelper; +import javax.jdo.PersistenceManager; +import javax.jdo.PersistenceManagerFactory; +import javax.jdo.datastore.DataStoreCache; +import javax.sql.DataSource; +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +/** + * This class is a wrapper class around PersistenceManagerFactory and its properties + * These objects are static and need to be carefully modified together such that there are no + * race-conditions when updating them. Additionally, this class provides thread-safe methods + * to get PersistenceManager instances from the current PersistenceManagerFactory. The most + * common usage of this class is to create a PersistenceManager from existing PersistenceManagerFactory + * PersistenceManagerFactory properties are modified less often and hence the update pmf properties + * can make use of read/write locks such that it is only blocking when current properties change. + */ +public class PersistenceManagerProvider { + private static PersistenceManagerFactory pmf; + private static Properties prop; + private static final ReentrantReadWriteLock pmfLock = new ReentrantReadWriteLock(); + private static final Lock pmfReadLock = pmfLock.readLock(); + private static final Lock pmfWriteLock = pmfLock.writeLock(); + private static final Logger LOG = LoggerFactory.getLogger(PersistenceManagerProvider.class); + private static final Map<String, Class<?>> PINCLASSMAP; + private static boolean forTwoMetastoreTesting; + private static int retryLimit; + private static long retryInterval; + + static { + Map<String, Class<?>> map = new HashMap<>(); + map.put("table", MTable.class); + map.put("storagedescriptor", MStorageDescriptor.class); + map.put("serdeinfo", MSerDeInfo.class); + map.put("partition", MPartition.class); + map.put("database", MDatabase.class); + map.put("type", MType.class); + map.put("fieldschema", MFieldSchema.class); + map.put("order", MOrder.class); + PINCLASSMAP = Collections.unmodifiableMap(map); + } + + private PersistenceManagerProvider() { + // prevent instantiation + } + + private static final Set<Class<? extends Throwable>> retriableExceptionClasses = + new HashSet<>(Arrays.asList(JDOCanRetryException.class)); + /** + * Helper function for initialize to determine if we should retry an exception. + * We return true if the exception is of a known type of retriable exceptions, or if one + * of its recursive .getCause returns a known type of retriable exception. + */ + private static boolean isRetriableException(Throwable e) { + if (e == null){ + return false; + } + if (retriableExceptionClasses.contains(e.getClass())){ + return true; + } + for (Class<? extends Throwable> c : retriableExceptionClasses){ + if (c.isInstance(e)){ + return true; + } + } + + if (e.getCause() == null){ + return false; + } + return isRetriableException(e.getCause()); + } + /** + * This method updates the PersistenceManagerFactory and its properties if the given + * configuration is different from its current set of properties. Most common case is that + * the persistenceManagerFactory properties do not change, and hence this method is optimized to + * be non-blocking in such cases. However, if the properties are different, this method blocks + * other threads until the properties are updated, current pmf is closed and + * a new pmf is re-initialized. Note that when a PersistenceManagerFactory is re-initialized all + * the PersistenceManagers which are instantiated using old factory become invalid and will throw + * JDOUserException. Hence it is recommended that this method is called in the setup/init phase + * of the Metastore service when there are no other active threads serving clients. + * + * @param conf Configuration which provides the datanucleus/datasource properties for comparison + */ + public static void updatePmfProperties(Configuration conf) { + // take a read lock to check if the datasource properties changed. + // Most common case is that datasource properties do not change + Properties propsFromConf = PersistenceManagerProvider.getDataSourceProps(conf); + pmfReadLock.lock(); + // keep track of if the read-lock is acquired by this thread + // so that we can unlock it before leaving this method + // this is needed because pmf methods below could throw JDOException (unchecked exception) + // which can lead to readLock not being acquired at the end of the inner try-finally + // block below + boolean readLockAcquired = true; + try { + // if pmf properties change, need to update, release read lock and take write lock + if (prop == null || pmf == null || !propsFromConf.equals(prop)) { + pmfReadLock.unlock(); + readLockAcquired = false; + pmfWriteLock.lock(); + try { + // check if we need to update pmf again here in case some other thread already did it + // for us after releasing readlock and before acquiring write lock above + if (prop == null || pmf == null || !propsFromConf.equals(prop)) { + // OK, now we really need to re-initialize pmf and pmf properties + if (LOG.isInfoEnabled()) { + LOG.info("Updating the pmf due to property change"); + if (prop == null) { + LOG.info("Current pmf properties are uninitialized"); + } else { + for (String key : prop.stringPropertyNames()) { + if (!key.equals(propsFromConf.get(key))) { + if (MetastoreConf.isPrintable(key) && !key.equals(ConfVars.CONNECT_URL_KEY.getVarname())) { + // The jdbc connection url can contain sensitive information like username and password + // which should be masked out before logging. + String oldVal = prop.getProperty(key); + String newVal = propsFromConf.getProperty(key); + LOG.debug("Found {} to be different. Old val : {} : New Val : {}", key, + oldVal, newVal); + } else { + LOG.debug("Found masked property {} to be different", key); + } + } + } + } + } + if (pmf != null) { + clearOutPmfClassLoaderCache(); + if (!forTwoMetastoreTesting) { + // close the underlying connection pool to avoid leaks + LOG.debug("Closing PersistenceManagerFactory"); + pmf.close(); + LOG.debug("PersistenceManagerFactory closed"); + } + pmf = null; + } + // update the pmf properties object then initialize pmf using them + prop = propsFromConf; + retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS); + retryInterval = MetastoreConf + .getTimeVar(conf, ConfVars.HMS_HANDLER_INTERVAL, TimeUnit.MILLISECONDS); + // init PMF with retry logic + retry(() -> {initPMF(conf); return null;}); + } + // downgrade by acquiring read lock before releasing write lock + pmfReadLock.lock(); + readLockAcquired = true; + } finally { + pmfWriteLock.unlock(); + } + } + } finally { + if (readLockAcquired) { + pmfReadLock.unlock(); + } + } + } + + private static void initPMF(Configuration conf) { + DataSourceProvider dsp = DataSourceProviderFactory.getDataSourceProvider(conf); + + if (dsp == null) { + pmf = JDOHelper.getPersistenceManagerFactory(prop); + } else { + try { + DataSource ds = dsp.create(conf); + Map<Object, Object> dsProperties = new HashMap<>(); + //Any preexisting datanucleus property should be passed along + dsProperties.putAll(prop); + dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY, ds); + dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY2, ds); + dsProperties.put(ConfVars.MANAGER_FACTORY_CLASS.getVarname(), + "org.datanucleus.api.jdo.JDOPersistenceManagerFactory"); + pmf = JDOHelper.getPersistenceManagerFactory(dsProperties); + } catch (SQLException e) { + LOG.warn("Could not create PersistenceManagerFactory using " + + "connection pool properties, will fall back", e); + pmf = JDOHelper.getPersistenceManagerFactory(prop); + } + } + DataStoreCache dsc = pmf.getDataStoreCache(); + if (dsc != null) { + String objTypes = MetastoreConf.getVar(conf, ConfVars.CACHE_PINOBJTYPES); + LOG.info( + "Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=\"{}\"", + objTypes); + if (org.apache.commons.lang.StringUtils.isNotEmpty(objTypes)) { + String[] typeTokens = objTypes.toLowerCase().split(","); + for (String type : typeTokens) { + type = type.trim(); + if (PINCLASSMAP.containsKey(type)) { + dsc.pinAll(true, PINCLASSMAP.get(type)); + } else { + LOG.warn("{} is not one of the pinnable object types: {}", type, + org.apache.commons.lang.StringUtils.join(PINCLASSMAP.keySet(), " ")); + } + } + } + } else { + LOG.warn("PersistenceManagerFactory returned null DataStoreCache object. " + + "Unable to initialize object pin types defined by hive.metastore.cache.pinobjtypes"); + } + } + + /** + * Removed cached classloaders from DataNucleus + * DataNucleus caches classloaders in NucleusContext. + * In UDFs, this can result in classloaders not getting GCed resulting in PermGen leaks. + * This is particularly an issue when using embedded metastore with HiveServer2, + * since the current classloader gets modified with each new add jar, + * becoming the classloader for downstream classes, which DataNucleus ends up using. + * The NucleusContext cache gets freed up only on calling a close on it. + * We're not closing NucleusContext since it does a bunch of other things which we don't want. + * We're not clearing the cache HashMap by calling HashMap#clear to avoid concurrency issues. + */ + public static void clearOutPmfClassLoaderCache() { + pmfWriteLock.lock(); + try { + if ((pmf == null) || (!(pmf instanceof JDOPersistenceManagerFactory))) { + return; + } + // NOTE : This is hacky, and this section of code is fragile depending on DN code varnames + // so it's likely to stop working at some time in the future, especially if we upgrade DN + // versions, so we actively need to find a better way to make sure the leak doesn't happen + // instead of just clearing out the cache after every call. + JDOPersistenceManagerFactory jdoPmf = (JDOPersistenceManagerFactory) pmf; + NucleusContext nc = jdoPmf.getNucleusContext(); + try { + Field pmCache = pmf.getClass().getDeclaredField("pmCache"); + pmCache.setAccessible(true); + Set<JDOPersistenceManager> pmSet = (Set<JDOPersistenceManager>) pmCache.get(pmf); + for (JDOPersistenceManager pm : pmSet) { + org.datanucleus.ExecutionContext ec = pm.getExecutionContext(); + if (ec instanceof org.datanucleus.ExecutionContextThreadedImpl) { + ClassLoaderResolver clr = + ((org.datanucleus.ExecutionContextThreadedImpl) ec).getClassLoaderResolver(); + clearClr(clr); + } + } + org.datanucleus.plugin.PluginManager pluginManager = + jdoPmf.getNucleusContext().getPluginManager(); + Field registryField = pluginManager.getClass().getDeclaredField("registry"); + registryField.setAccessible(true); + org.datanucleus.plugin.PluginRegistry registry = + (org.datanucleus.plugin.PluginRegistry) registryField.get(pluginManager); + if (registry instanceof org.datanucleus.plugin.NonManagedPluginRegistry) { + org.datanucleus.plugin.NonManagedPluginRegistry nRegistry = + (org.datanucleus.plugin.NonManagedPluginRegistry) registry; + Field clrField = nRegistry.getClass().getDeclaredField("clr"); + clrField.setAccessible(true); + ClassLoaderResolver clr = (ClassLoaderResolver) clrField.get(nRegistry); + clearClr(clr); + } + if (nc instanceof org.datanucleus.PersistenceNucleusContextImpl) { + org.datanucleus.PersistenceNucleusContextImpl pnc = + (org.datanucleus.PersistenceNucleusContextImpl) nc; + org.datanucleus.store.types.TypeManagerImpl tm = + (org.datanucleus.store.types.TypeManagerImpl) pnc.getTypeManager(); + Field clrField = tm.getClass().getDeclaredField("clr"); + clrField.setAccessible(true); + ClassLoaderResolver clr = (ClassLoaderResolver) clrField.get(tm); + clearClr(clr); + Field storeMgrField = pnc.getClass().getDeclaredField("storeMgr"); + storeMgrField.setAccessible(true); + org.datanucleus.store.rdbms.RDBMSStoreManager storeMgr = + (org.datanucleus.store.rdbms.RDBMSStoreManager) storeMgrField.get(pnc); + Field backingStoreField = + storeMgr.getClass().getDeclaredField("backingStoreByMemberName"); + backingStoreField.setAccessible(true); + Map<String, Store> backingStoreByMemberName = + (Map<String, Store>) backingStoreField.get(storeMgr); + for (Store store : backingStoreByMemberName.values()) { + org.datanucleus.store.rdbms.scostore.BaseContainerStore baseStore = + (org.datanucleus.store.rdbms.scostore.BaseContainerStore) store; + clrField = org.datanucleus.store.rdbms.scostore.BaseContainerStore.class + .getDeclaredField("clr"); + clrField.setAccessible(true); + clr = (ClassLoaderResolver) clrField.get(baseStore); + clearClr(clr); + } + } + Field classLoaderResolverMap = + AbstractNucleusContext.class.getDeclaredField("classLoaderResolverMap"); + classLoaderResolverMap.setAccessible(true); + Map<String, ClassLoaderResolver> loaderMap = + (Map<String, ClassLoaderResolver>) classLoaderResolverMap.get(nc); + for (ClassLoaderResolver clr : loaderMap.values()) { + clearClr(clr); + } + classLoaderResolverMap.set(nc, new HashMap<String, ClassLoaderResolver>()); + LOG.debug("Removed cached classloaders from DataNucleus NucleusContext"); + } catch (Exception e) { + LOG.warn("Failed to remove cached classloaders from DataNucleus NucleusContext", e); + } + } finally { + pmfWriteLock.unlock(); + } + } + + private static void clearClr(ClassLoaderResolver clr) throws Exception { + if (clr != null) { + if (clr instanceof ClassLoaderResolverImpl) { + ClassLoaderResolverImpl clri = (ClassLoaderResolverImpl) clr; + long resourcesCleared = clearFieldMap(clri, "resources"); + long loadedClassesCleared = clearFieldMap(clri, "loadedClasses"); + long unloadedClassesCleared = clearFieldMap(clri, "unloadedClasses"); + LOG.debug("Cleared ClassLoaderResolverImpl: {}, {}, {}", resourcesCleared, + loadedClassesCleared, unloadedClassesCleared); + } + } + } + + private static long clearFieldMap(ClassLoaderResolverImpl clri, String mapFieldName) + throws Exception { + Field mapField = ClassLoaderResolverImpl.class.getDeclaredField(mapFieldName); + mapField.setAccessible(true); + + Map<String, Class> map = (Map<String, Class>) mapField.get(clri); + long sz = map.size(); + mapField.set(clri, Collections.synchronizedMap(new WeakValueMap())); + return sz; + } + + /** + * creates a PersistenceManager instance for the current PersistenceManagerFactory. Note that this + * acquires a read-lock on PersistenceManagerFactory so that this method will block if any other + * thread is actively, (re-)initializing PersistenceManagerFactory when this method is called + * Note that this method throws a RuntimeException, if PersistenceManagerFactory is not yet initialized. + * + * @return PersistenceManager from the current PersistenceManagerFactory instance + */ + public static PersistenceManager getPersistenceManager() { + pmfReadLock.lock(); + try { + if (pmf == null) { + throw new RuntimeException( + "Cannot create PersistenceManager. PersistenceManagerFactory is not yet initialized"); + } + return retry(pmf::getPersistenceManager); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + pmfReadLock.unlock(); + } + } + + /** + * Properties specified in hive-default.xml override the properties specified + * in jpox.properties. + */ + @SuppressWarnings("nls") + private static Properties getDataSourceProps(Configuration conf) { + Properties prop = new Properties(); + correctAutoStartMechanism(conf); + + // First, go through and set all our values for datanucleus and javax.jdo parameters. This + // has to be a separate first step because we don't set the default values in the config object. + for (ConfVars var : MetastoreConf.dataNucleusAndJdoConfs) { + String confVal = MetastoreConf.getAsString(conf, var); + String varName = var.getVarname(); + Object prevVal = prop.setProperty(varName, confVal); + if (MetastoreConf.isPrintable(varName)) { + LOG.debug("Overriding {} value {} from jpox.properties with {}", varName, prevVal, confVal); + } + } + + // Now, we need to look for any values that the user set that MetastoreConf doesn't know about. + // TODO Commenting this out for now, as it breaks because the conf values aren't getting properly + // interpolated in case of variables. See HIVE-17788. + /* + for (Map.Entry<String, String> e : conf) { + if (e.getKey().startsWith("datanucleus.") || e.getKey().startsWith("javax.jdo.")) { + // We have to handle this differently depending on whether it is a value known to + // MetastoreConf or not. If it is, we need to get the default value if a value isn't + // provided. If not, we just set whatever the user has set. + Object prevVal = prop.setProperty(e.getKey(), e.getValue()); + if (LOG.isDebugEnabled() && MetastoreConf.isPrintable(e.getKey())) { + LOG.debug("Overriding " + e.getKey() + " value " + prevVal + + " from jpox.properties with " + e.getValue()); + } + } + } + */ + + // Password may no longer be in the conf, use getPassword() + try { + String passwd = MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD); + if (org.apache.commons.lang.StringUtils.isNotEmpty(passwd)) { + // We can get away with the use of varname here because varname == hiveName for PWD + prop.setProperty(ConfVars.PWD.getVarname(), passwd); + } + } catch (IOException err) { + throw new RuntimeException("Error getting metastore password: " + err.getMessage(), err); + } + + if (LOG.isDebugEnabled()) { + for (Entry<Object, Object> e : prop.entrySet()) { + if (MetastoreConf.isPrintable(e.getKey().toString())) { + LOG.debug("{} = {}", e.getKey(), e.getValue()); + } + } + } + + return prop; + } + + /** + * Update conf to set datanucleus.autoStartMechanismMode=ignored. + * This is necessary to able to use older version of hive against + * an upgraded but compatible metastore schema in db from new version + * of hive + * + * @param conf + */ + private static void correctAutoStartMechanism(Configuration conf) { + final String autoStartKey = "datanucleus.autoStartMechanismMode"; + final String autoStartIgnore = "ignored"; + String currentAutoStartVal = conf.get(autoStartKey); + if (!autoStartIgnore.equalsIgnoreCase(currentAutoStartVal)) { + LOG.warn("{} is set to unsupported value {} . Setting it to value: {}", autoStartKey, + conf.get(autoStartKey), autoStartIgnore); + } + conf.set(autoStartKey, autoStartIgnore); + } + + /** + * To make possible to run multiple metastore in unit test + * + * @param twoMetastoreTesting if we are using multiple metastore in unit test + */ + @VisibleForTesting + public static void setTwoMetastoreTesting(boolean twoMetastoreTesting) { + forTwoMetastoreTesting = twoMetastoreTesting; + } + + public static String getProperty(String key) { + return prop == null ? null : prop.getProperty(key); + } + + private static <T> T retry(Supplier<T> s) { + Exception ex = null; + int myRetryLimit = retryLimit; + while (myRetryLimit > 0) { + try { + return s.get(); + } catch (Exception e) { + myRetryLimit--; + boolean retriable = isRetriableException(e); + if (myRetryLimit > 0 && retriable) { + LOG.info("Retriable exception while invoking method, retrying. {} attempts left", + myRetryLimit, e); + try { + Thread.sleep(retryInterval); + } catch (InterruptedException ie) { + // Restore the interrupted status, since we do not want to catch it. + LOG.debug("Interrupted while sleeping before retrying.", ie); + Thread.currentThread().interrupt(); + } + // If we're here, we'll proceed down the next while loop iteration. + } else { + // we've reached our limit, throw the last one. + if (retriable) { + LOG.warn("Exception retry limit reached, not retrying any longer.", e); + } else { + LOG.debug("Non-retriable exception.", e); + } + ex = e; + } + } + } + throw new RuntimeException(ex); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/db8e9b0e/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java index 2d9c229..23c7670 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java @@ -62,19 +62,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jdo.Query; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Random; import java.util.Set; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.Callable; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import static java.util.concurrent.Executors.newFixedThreadPool; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; @Category(MetastoreUnitTest.class) @@ -511,8 +517,8 @@ public class TestObjectStore { localConf.set(key1, value1); objectStore = new ObjectStore(); objectStore.setConf(localConf); - Assert.assertEquals(value, objectStore.getProp().getProperty(key)); - Assert.assertNull(objectStore.getProp().getProperty(key1)); + Assert.assertEquals(value, PersistenceManagerProvider.getProperty(key)); + Assert.assertNull(PersistenceManagerProvider.getProperty(key1)); } /** @@ -714,6 +720,52 @@ public class TestObjectStore { Assert.assertTrue("Expect no active transactions.", !objectStore.isActiveTransaction()); } + /** + * This test calls ObjectStore.setConf methods from multiple threads. Each threads uses its + * own instance of ObjectStore to simulate thread-local objectstore behaviour. + * @throws Exception + */ + @Test + public void testConcurrentPMFInitialize() throws Exception { + final String dataSourceProp = "datanucleus.connectionPool.maxPoolSize"; + // Barrier is used to ensure that all threads start race at the same time + final int numThreads = 10; + final int numIteration = 50; + final CyclicBarrier barrier = new CyclicBarrier(numThreads); + final AtomicInteger counter = new AtomicInteger(0); + ExecutorService executor = newFixedThreadPool(numThreads); + List<Future<Void>> results = new ArrayList<>(numThreads); + for (int i = 0; i < numThreads; i++) { + final Random random = new Random(); + Configuration conf = MetastoreConf.newMetastoreConf(); + MetaStoreTestUtils.setConfForStandloneMode(conf); + results.add(executor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + // each thread gets its own ObjectStore to simulate threadLocal store + ObjectStore objectStore = new ObjectStore(); + barrier.await(); + for (int j = 0; j < numIteration; j++) { + // set connectionPool to a random value to increase the likelihood of pmf + // re-initialization + int randomNumber = random.nextInt(100); + if (randomNumber % 2 == 0) { + objectStore.setConf(conf); + } else { + Assert.assertNotNull(objectStore.getPersistenceManager()); + } + counter.getAndIncrement(); + } + return null; + } + })); + } + for (Future<Void> future : results) { + future.get(120, TimeUnit.SECONDS); + } + Assert.assertEquals("Unexpected number of setConf calls", numIteration * numThreads, + counter.get()); + } private void createTestCatalog(String catName) throws MetaException { Catalog cat = new CatalogBuilder()