Repository: hive Updated Branches: refs/heads/master 5a5f8e49f -> 89703e7d0
HIVE-12266 When client exists abnormally, it doesn't release ACID locks (Wei Zheng, via Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/595fa998 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/595fa998 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/595fa998 Branch: refs/heads/master Commit: 595fa9988fcb3e67b60845b44e1df4cc49ce38b2 Parents: 5a5f8e4 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Tue Nov 3 09:03:54 2015 -0800 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Tue Nov 3 09:03:54 2015 -0800 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hive/ql/Driver.java | 43 +++++++++++++++----- 1 file changed, 32 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/595fa998/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 18052f3..93c7a54 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -121,12 +121,14 @@ import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; +import org.apache.hive.common.util.ShutdownHookManager; public class Driver implements CommandProcessor { static final private String CLASS_NAME = Driver.class.getName(); private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); static final private LogHelper console = new LogHelper(LOG); + static final int SHUTDOWN_HOOK_PRIORITY = 0; private int maxRows = 100; ByteStream.Output bos = new ByteStream.Output(); @@ -390,7 +392,20 @@ public class Driver implements CommandProcessor { try { // Initialize the transaction manager. This must be done before analyze is called. - SessionState.get().initTxnMgr(conf); + final HiveTxnManager txnManager = SessionState.get().initTxnMgr(conf); + // In case when user Ctrl-C twice to kill Hive CLI JVM, we want to release locks + ShutdownHookManager.addShutdownHook( + new Runnable() { + @Override + public void run() { + try { + releaseLocksAndCommitOrRollback(false, txnManager); + } catch (LockException e) { + LOG.warn("Exception when releasing locks in ShutdownHook for Driver: " + + e.getMessage()); + } + } + }, SHUTDOWN_HOOK_PRIORITY); command = new VariableSubstitution(new HiveVariableSource() { @Override @@ -537,7 +552,7 @@ public class Driver implements CommandProcessor { * * @param sem semantic analyzer for analyzed query * @param plan query plan - * @param astStringTree AST tree dump + * @param astTree AST tree dump * @throws java.io.IOException */ private String getExplainOutput(BaseSemanticAnalyzer sem, QueryPlan plan, @@ -1049,15 +1064,21 @@ public class Driver implements CommandProcessor { /** * @param commit if there is an open transaction and if true, commit, * if false rollback. If there is no open transaction this parameter is ignored. + * @param txnManager an optional existing transaction manager retrieved earlier from the session * **/ - private void releaseLocksAndCommitOrRollback(boolean commit) + private void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnManager) throws LockException { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS); - SessionState ss = SessionState.get(); - HiveTxnManager txnMgr = ss.getTxnMgr(); + HiveTxnManager txnMgr; + if (txnManager == null) { + SessionState ss = SessionState.get(); + txnMgr = ss.getTxnMgr(); + } else { + txnMgr = txnManager; + } // If we've opened a transaction we need to commit or rollback rather than explicitly // releasing the locks. if (txnMgr.isTxnOpen()) { @@ -1206,7 +1227,7 @@ public class Driver implements CommandProcessor { } if (ret != 0) { try { - releaseLocksAndCommitOrRollback(false); + releaseLocksAndCommitOrRollback(false, null); } catch (LockException e) { LOG.warn("Exception in releasing locks. " + org.apache.hadoop.util.StringUtils.stringifyException(e)); @@ -1287,7 +1308,7 @@ public class Driver implements CommandProcessor { if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) { /*here, if there is an open txn, we want to commit it; this behavior matches * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/ - releaseLocksAndCommitOrRollback(true); + releaseLocksAndCommitOrRollback(true, null); txnManager.setAutoCommit(true); } else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) { @@ -1315,10 +1336,10 @@ public class Driver implements CommandProcessor { //if needRequireLock is false, the release here will do nothing because there is no lock try { if(txnManager.getAutoCommit() || plan.getOperation() == HiveOperation.COMMIT) { - releaseLocksAndCommitOrRollback(true); + releaseLocksAndCommitOrRollback(true, null); } else if(plan.getOperation() == HiveOperation.ROLLBACK) { - releaseLocksAndCommitOrRollback(false); + releaseLocksAndCommitOrRollback(false, null); } else { //txn (if there is one started) is not finished @@ -1349,7 +1370,7 @@ public class Driver implements CommandProcessor { private CommandProcessorResponse rollback(CommandProcessorResponse cpr) { //console.printError(cpr.toString()); try { - releaseLocksAndCommitOrRollback(false); + releaseLocksAndCommitOrRollback(false, null); } catch (LockException e) { LOG.error("rollback() FAILED: " + cpr);//make sure not to loose @@ -1897,7 +1918,7 @@ public class Driver implements CommandProcessor { destroyed = true; if (!hiveLocks.isEmpty()) { try { - releaseLocksAndCommitOrRollback(false); + releaseLocksAndCommitOrRollback(false, null); } catch (LockException e) { LOG.warn("Exception when releasing locking in destroy: " + e.getMessage());