Author: kevinwilfong Date: Mon Oct 1 18:14:02 2012 New Revision: 1392491 URL: http://svn.apache.org/viewvc?rev=1392491&view=rev Log: HIVE-3484. RetryingRawStore logic needs to be significantly reworked to support retries within transactions (Jean Xu via kevinwilfong)
Added: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IHMSHandler.java hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreInit.java hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java hive/trunk/conf/hive-default.xml.template hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1392491&r1=1392490&r2=1392491&view=diff ============================================================================== --- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original) +++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Oct 1 18:14:02 2012 @@ -131,6 +131,9 @@ public class HiveConf extends Configurat HiveConf.ConfVars.METASTORE_PART_INHERIT_TBL_PROPS, HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_TABLE_PARTITION_MAX, HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS, + HiveConf.ConfVars.HMSHANDLERATTEMPTS, + HiveConf.ConfVars.HMSHANDLERINTERVAL, + HiveConf.ConfVars.HMSHANDLERFORCERELOADCONF, }; /** @@ -257,9 +260,18 @@ public class HiveConf extends Configurat METASTOREINTERVAL("hive.metastore.ds.retry.interval", 1000), // Whether to force reloading of the metastore configuration (including // the connection URL, before the next metastore query that accesses the - // datastore. Once reloaded, the this value is reset to false. Used for + // datastore. Once reloaded, this value is reset to false. Used for // testing only. METASTOREFORCERELOADCONF("hive.metastore.force.reload.conf", false), + // Number of attempts to retry connecting after there is a JDO datastore err + HMSHANDLERATTEMPTS("hive.hmshandler.retry.attempts", 1), + // Number of miliseconds to wait between attepting + HMSHANDLERINTERVAL("hive.hmshandler.retry.interval", 1000), + // Whether to force reloading of the HMSHandler configuration (including + // the connection URL, before the next metastore query that accesses the + // datastore. Once reloaded, this value is reset to false. Used for + // testing only. + HMSHANDLERFORCERELOADCONF("hive.hmshandler.force.reload.conf", false), METASTORESERVERMINTHREADS("hive.metastore.server.min.threads", 200), METASTORESERVERMAXTHREADS("hive.metastore.server.max.threads", 100000), METASTORE_TCP_KEEP_ALIVE("hive.metastore.server.tcp.keepalive", true), Modified: hive/trunk/conf/hive-default.xml.template URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1392491&r1=1392490&r2=1392491&view=diff ============================================================================== --- hive/trunk/conf/hive-default.xml.template (original) +++ hive/trunk/conf/hive-default.xml.template Mon Oct 1 18:14:02 2012 @@ -1465,5 +1465,17 @@ </description> </property> +<property> + <name>hive.hmshandler.retry.attempts</name> + <value>1</value> + <description>The number of times to retry a HMSHandler call if there were a connection error</description> +</property> + +<property> + <name>hive.hmshandler.retry.interval</name> + <value>1000</value> + <description>The number of miliseconds between HMSHandler retry attempts</description> +</property> + </configuration> Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1392491&r1=1392490&r2=1392491&view=diff ============================================================================== --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original) +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Mon Oct 1 18:14:02 2012 @@ -166,7 +166,7 @@ public class HiveMetaStore extends Thrif } public static class HMSHandler extends FacebookBase implements - ThriftHiveMetastore.Iface { + IHMSHandler { public static final Log LOG = HiveMetaStore.LOG; private static boolean createDefaultDB = false; private String rawStoreClassName; @@ -342,6 +342,14 @@ public class HiveMetaStore extends Thrif return threadLocalId.get() + ": " + s; } + public void setConf(Configuration conf) { + threadLocalConf.set(conf); + RawStore ms = threadLocalMS.get(); + if (ms != null) { + ms.setConf(conf); + } + } + private Configuration getConf() { Configuration conf = threadLocalConf.get(); if (conf == null) { @@ -366,7 +374,6 @@ public class HiveMetaStore extends Thrif threadLocalMS.set(ms); ms = threadLocalMS.get(); } - return ms; } @@ -3185,6 +3192,10 @@ public class HiveMetaStore extends Thrif } } + public static IHMSHandler newHMSHandler(String name, HiveConf hiveConf) throws MetaException { + return RetryingHMSHandler.getProxy(hiveConf, name); + } + /** * Discard a current delegation token. @@ -3377,12 +3388,12 @@ public class HiveMetaStore extends Thrif // start delegation token manager saslServer.startDelegationTokenSecretManager(conf); transFactory = saslServer.createTransportFactory(); - processor = saslServer.wrapProcessor(new ThriftHiveMetastore.Processor<HMSHandler>( - new HMSHandler("new db based metaserver", conf))); + processor = saslServer.wrapProcessor(new ThriftHiveMetastore.Processor<IHMSHandler>( + newHMSHandler("new db based metaserver", conf))); LOG.info("Starting DB backed MetaStore Server in Secure Mode"); } else { // we are in unsecure mode. - HMSHandler handler = new HMSHandler("new db based metaserver", conf); + IHMSHandler handler = newHMSHandler("new db based metaserver", conf); if (conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)) { transFactory = useFramedTransport ? @@ -3390,12 +3401,12 @@ public class HiveMetaStore extends Thrif new TUGIContainingTransport.Factory()) : new TUGIContainingTransport.Factory(); - processor = new TUGIBasedProcessor<HMSHandler>(handler); + processor = new TUGIBasedProcessor<IHMSHandler>(handler); LOG.info("Starting DB backed MetaStore Server with SetUGI enabled"); } else { transFactory = useFramedTransport ? new TFramedTransport.Factory() : new TTransportFactory(); - processor = new TSetIpAddressProcessor<HMSHandler>(handler); + processor = new TSetIpAddressProcessor<IHMSHandler>(handler); LOG.info("Starting DB backed MetaStore Server"); } } Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java?rev=1392491&r1=1392490&r2=1392491&view=diff ============================================================================== --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (original) +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java Mon Oct 1 18:14:02 2012 @@ -113,7 +113,7 @@ public class HiveMetaStoreClient impleme if (localMetaStore) { // instantiate the metastore server handler directly instead of connecting // through the network - client = new HiveMetaStore.HMSHandler("hive client", conf); + client = HiveMetaStore.newHMSHandler("hive client", conf); isConnected = true; return; } Added: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IHMSHandler.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IHMSHandler.java?rev=1392491&view=auto ============================================================================== --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IHMSHandler.java (added) +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IHMSHandler.java Mon Oct 1 18:14:02 2012 @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; + +public interface IHMSHandler extends ThriftHiveMetastore.Iface { + + public abstract void setConf(Configuration conf); +} Added: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreInit.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreInit.java?rev=1392491&view=auto ============================================================================== --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreInit.java (added) +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreInit.java Mon Oct 1 18:14:02 2012 @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.hooks.JDOConnectionURLHook; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * MetaStoreInit defines functions to init/update MetaStore connection url. + * + */ +public class MetaStoreInit { + + private static final Log LOG = LogFactory.getLog(MetaStoreInit.class); + + static class MetaStoreInitData { + JDOConnectionURLHook urlHook = null; + String urlHookClassName = ""; + } + + /** + * Updates the connection URL in hiveConf using the hook + * + * @return true if a new connection URL was loaded into the thread local + * configuration + */ + static boolean updateConnectionURL(HiveConf hiveConf, Configuration conf, + String badUrl, MetaStoreInitData updateData) + throws MetaException { + String connectUrl = null; + String currentUrl = MetaStoreInit.getConnectionURL(conf); + try { + // We always call init because the hook name in the configuration could + // have changed. + MetaStoreInit.initConnectionUrlHook(hiveConf, updateData); + if (updateData.urlHook != null) { + if (badUrl != null) { + updateData.urlHook.notifyBadConnectionUrl(badUrl); + } + connectUrl = updateData.urlHook.getJdoConnectionUrl(hiveConf); + } + } catch (Exception e) { + LOG.error("Exception while getting connection URL from the hook: " + + e); + } + + if (connectUrl != null && !connectUrl.equals(currentUrl)) { + LOG.error( + String.format("Overriding %s with %s", + HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(), + connectUrl)); + conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(), + connectUrl); + return true; + } + return false; + } + + static String getConnectionURL(Configuration conf) { + return conf.get( + HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(), ""); + } + + // Multiple threads could try to initialize at the same time. + synchronized private static void initConnectionUrlHook(HiveConf hiveConf, + MetaStoreInitData updateData) throws ClassNotFoundException { + + String className = + hiveConf.get(HiveConf.ConfVars.METASTORECONNECTURLHOOK.toString(), "").trim(); + if (className.equals("")) { + updateData.urlHookClassName = ""; + updateData.urlHook = null; + return; + } + boolean urlHookChanged = !updateData.urlHookClassName.equals(className); + if (updateData.urlHook == null || urlHookChanged) { + updateData.urlHookClassName = className.trim(); + + Class<?> urlHookClass = Class.forName(updateData.urlHookClassName, true, + JavaUtils.getClassLoader()); + updateData.urlHook = (JDOConnectionURLHook) ReflectionUtils.newInstance(urlHookClass, null); + } + return; + } +} Added: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java?rev=1392491&view=auto ============================================================================== --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java (added) +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingHMSHandler.java Mon Oct 1 18:14:02 2012 @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.lang.reflect.UndeclaredThrowableException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.hooks.JDOConnectionURLHook; +import org.apache.hadoop.util.ReflectionUtils; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class RetryingHMSHandler implements InvocationHandler { + + private static final Log LOG = LogFactory.getLog(RetryingHMSHandler.class); + + private final IHMSHandler base; + private MetaStoreInit.MetaStoreInitData metaStoreInitData = + new MetaStoreInit.MetaStoreInitData(); + private final HiveConf hiveConf; + + protected RetryingHMSHandler(HiveConf hiveConf, String name) throws MetaException { + this.hiveConf = hiveConf; + + // This has to be called before initializing the instance of HMSHandler + init(); + + this.base = (IHMSHandler) new HiveMetaStore.HMSHandler(name, hiveConf); + } + + public static IHMSHandler getProxy(HiveConf hiveConf, String name) throws MetaException { + + RetryingHMSHandler handler = new RetryingHMSHandler(hiveConf, name); + + return (IHMSHandler) Proxy.newProxyInstance( + RetryingHMSHandler.class.getClassLoader(), + new Class[] { IHMSHandler.class }, handler); + } + + private void init() throws MetaException { + // Using the hook on startup ensures that the hook always has priority + // over settings in *.xml. The thread local conf needs to be used because at this point + // it has already been initialized using hiveConf. + MetaStoreInit.updateConnectionURL(hiveConf, getConf(), null, metaStoreInitData); + + } + + private void initMS() { + base.setConf(getConf()); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + Object ret = null; + + boolean gotNewConnectUrl = false; + boolean reloadConf = HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.HMSHANDLERFORCERELOADCONF); + int retryInterval = HiveConf.getIntVar(hiveConf, + HiveConf.ConfVars.HMSHANDLERINTERVAL); + int retryLimit = HiveConf.getIntVar(hiveConf, + HiveConf.ConfVars.HMSHANDLERATTEMPTS); + + if (reloadConf) { + MetaStoreInit.updateConnectionURL(hiveConf, getConf(), + null, metaStoreInitData); + } + + int retryCount = 0; + // Exception caughtException = null; + Throwable caughtException = null; + while (true) { + try { + if (reloadConf || gotNewConnectUrl) { + initMS(); + } + ret = method.invoke(base, args); + break; + } catch (javax.jdo.JDOException e) { + caughtException = e; + } catch (UndeclaredThrowableException e) { + if (e.getCause() != null) { + if (e.getCause() instanceof javax.jdo.JDOException) { + // Due to reflection, the jdo exception is wrapped in + // invocationTargetException + caughtException = e.getCause(); + } + else { + throw e.getCause(); + } + } + else { + throw e; + } + } catch (InvocationTargetException e) { + if (e.getCause() instanceof javax.jdo.JDOException) { + // Due to reflection, the jdo exception is wrapped in + // invocationTargetException + caughtException = e.getCause(); + } + else { + throw e.getCause(); + } + } + + if (retryCount >= retryLimit) { + throw caughtException; + } + + assert (retryInterval >= 0); + retryCount++; + LOG.error( + String.format( + "JDO datastore error. Retrying HMSHandler " + + "after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit)); + Thread.sleep(retryInterval); + // If we have a connection error, the JDO connection URL hook might + // provide us with a new URL to access the datastore. + String lastUrl = MetaStoreInit.getConnectionURL(getConf()); + gotNewConnectUrl = MetaStoreInit.updateConnectionURL(hiveConf, getConf(), + lastUrl, metaStoreInitData); + } + return ret; + } + + public Configuration getConf() { + return hiveConf; + } +} Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java?rev=1392491&r1=1392490&r2=1392491&view=diff ============================================================================== --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java (original) +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java Mon Oct 1 18:14:02 2012 @@ -44,8 +44,8 @@ public class RetryingRawStore implements private final RawStore base; private int retryInterval = 0; private int retryLimit = 0; - private JDOConnectionURLHook urlHook = null; - private String urlHookClassName = ""; + private MetaStoreInit.MetaStoreInitData metaStoreInitData = + new MetaStoreInit.MetaStoreInitData(); private final int id; private final HiveConf hiveConf; private final Configuration conf; // thread local conf from HMS @@ -82,7 +82,7 @@ public class RetryingRawStore implements // Using the hook on startup ensures that the hook always has priority // over settings in *.xml. The thread local conf needs to be used because at this point // it has already been initialized using hiveConf. - updateConnectionURL(getConf(), null); + MetaStoreInit.updateConnectionURL(hiveConf, getConf(), null, metaStoreInitData); } private void initMS() { @@ -98,7 +98,7 @@ public class RetryingRawStore implements HiveConf.ConfVars.METASTOREFORCERELOADCONF); if (reloadConf) { - updateConnectionURL(getConf(), null); + MetaStoreInit.updateConnectionURL(hiveConf, getConf(), null, metaStoreInitData); } int retryCount = 0; @@ -138,76 +138,13 @@ public class RetryingRawStore implements Thread.sleep(retryInterval); // If we have a connection error, the JDO connection URL hook might // provide us with a new URL to access the datastore. - String lastUrl = getConnectionURL(getConf()); - gotNewConnectUrl = updateConnectionURL(getConf(), lastUrl); + String lastUrl = MetaStoreInit.getConnectionURL(getConf()); + gotNewConnectUrl = MetaStoreInit.updateConnectionURL(hiveConf, getConf(), + lastUrl, metaStoreInitData); } return ret; } - /** - * Updates the connection URL in hiveConf using the hook - * - * @return true if a new connection URL was loaded into the thread local - * configuration - */ - private boolean updateConnectionURL(Configuration conf, String badUrl) - throws MetaException { - String connectUrl = null; - String currentUrl = getConnectionURL(conf); - try { - // We always call init because the hook name in the configuration could - // have changed. - initConnectionUrlHook(); - if (urlHook != null) { - if (badUrl != null) { - urlHook.notifyBadConnectionUrl(badUrl); - } - connectUrl = urlHook.getJdoConnectionUrl(hiveConf); - } - } catch (Exception e) { - LOG.error("Exception while getting connection URL from the hook: " + - e); - } - - if (connectUrl != null && !connectUrl.equals(currentUrl)) { - LOG.error(addPrefix( - String.format("Overriding %s with %s", - HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(), - connectUrl))); - conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(), - connectUrl); - return true; - } - return false; - } - - private static String getConnectionURL(Configuration conf) { - return conf.get( - HiveConf.ConfVars.METASTORECONNECTURLKEY.toString(), ""); - } - - // Multiple threads could try to initialize at the same time. - synchronized private void initConnectionUrlHook() - throws ClassNotFoundException { - - String className = - hiveConf.get(HiveConf.ConfVars.METASTORECONNECTURLHOOK.toString(), "").trim(); - if (className.equals("")) { - urlHookClassName = ""; - urlHook = null; - return; - } - boolean urlHookChanged = !urlHookClassName.equals(className); - if (urlHook == null || urlHookChanged) { - urlHookClassName = className.trim(); - - Class<?> urlHookClass = Class.forName(urlHookClassName, true, - JavaUtils.getClassLoader()); - urlHook = (JDOConnectionURLHook) ReflectionUtils.newInstance(urlHookClass, null); - } - return; - } - private String addPrefix(String s) { return id + ": " + s; }