Repository: hive Updated Branches: refs/heads/master 80fb89131 -> 4b444082f
HIVE-10521 - TxnHandler.timeOutTxns only times out some of the expired transactions (Alan Gates via Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4b444082 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4b444082 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4b444082 Branch: refs/heads/master Commit: 4b444082fcae9eb8ea60ec160723a0337ead1852 Parents: 80fb891 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Wed May 6 19:36:48 2015 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Wed May 6 19:36:48 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/metastore/txn/TxnHandler.java | 35 ++++++++++++------ .../hive/metastore/txn/TestTxnHandler.java | 39 +++++++++++++++----- 2 files changed, 53 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4b444082/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 704c3ed..7c3b55c 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -75,6 +75,7 @@ public class TxnHandler { static final protected char LOCK_SEMI_SHARED = 'w'; static final private int ALLOWED_REPEATED_DEADLOCKS = 10; + static final private int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 100; static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName()); static private DataSource connPool; @@ -130,7 +131,8 @@ public class TxnHandler { timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS); deadlockCnt = 0; buildJumpTable(); - retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL, TimeUnit.MILLISECONDS); + retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL, + TimeUnit.MILLISECONDS); retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS); deadlockRetryInterval = retryInterval / 10; @@ -334,9 +336,7 @@ public class TxnHandler { Connection dbConn = null; try { dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); - List<Long> txnids = new ArrayList<Long>(1); - txnids.add(txnid); - if (abortTxns(dbConn, txnids) != 1) { + if (abortTxns(dbConn, Collections.singletonList(txnid)) != 1) { LOG.debug("Going to rollback"); dbConn.rollback(); throw new NoSuchTxnException("No such transaction: " + txnid); @@ -1321,8 +1321,6 @@ public class TxnHandler { LOG.debug("Going to execute update <" + buf.toString() + ">"); updateCnt = stmt.executeUpdate(buf.toString()); - LOG.debug("Going to commit"); - dbConn.commit(); } finally { closeStmt(stmt); } @@ -1818,10 +1816,10 @@ public class TxnHandler { } } - // Abort timed out transactions. This calls abortTxn(), which does a commit, + // Abort timed out transactions. This does a commit, // and thus should be done before any calls to heartbeat that will leave // open transactions on the underlying database. - private void timeOutTxns(Connection dbConn) throws SQLException, MetaException { + private void timeOutTxns(Connection dbConn) throws SQLException, MetaException, RetryException { long now = getDbTime(dbConn); Statement stmt = null; try { @@ -1834,10 +1832,23 @@ public class TxnHandler { List<Long> deadTxns = new ArrayList<Long>(); // Limit the number of timed out transactions we do in one pass to keep from generating a // huge delete statement - for (int i = 0; i < 20 && rs.next(); i++) deadTxns.add(rs.getLong(1)); - // We don't care whether all of the transactions get deleted or not, - // if some didn't it most likely means someone else deleted them in the interum - if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns); + do { + deadTxns.clear(); + for (int i = 0; i < TIMED_OUT_TXN_ABORT_BATCH_SIZE && rs.next(); i++) { + deadTxns.add(rs.getLong(1)); + } + // We don't care whether all of the transactions get deleted or not, + // if some didn't it most likely means someone else deleted them in the interum + if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns); + } while (deadTxns.size() > 0); + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "abortTxn"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); } finally { closeStmt(stmt); } http://git-wip-us.apache.org/repos/asf/hive/blob/4b444082/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index d4266e1..f478184 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -937,16 +937,16 @@ public class TestTxnHandler { @Test public void testLockTimeout() throws Exception { long timeout = txnHandler.setTimeout(1); - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List<LockComponent> components = new ArrayList<LockComponent>(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lock(req); - assertTrue(res.getState() == LockState.ACQUIRED); - Thread.currentThread().sleep(10); try { + LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition"); + List<LockComponent> components = new ArrayList<LockComponent>(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + assertTrue(res.getState() == LockState.ACQUIRED); + Thread.currentThread().sleep(10); txnHandler.checkLock(new CheckLockRequest(res.getLockid())); fail("Told there was a lock, when it should have timed out."); } catch (NoSuchLockException e) { @@ -956,6 +956,27 @@ public class TestTxnHandler { } @Test + public void testRecoverManyTimeouts() throws Exception { + long timeout = txnHandler.setTimeout(1); + try { + txnHandler.openTxns(new OpenTxnRequest(503, "me", "localhost")); + Thread.currentThread().sleep(10); + txnHandler.getOpenTxns(); + GetOpenTxnsInfoResponse rsp = txnHandler.getOpenTxnsInfo(); + int numAborted = 0; + for (TxnInfo txnInfo : rsp.getOpen_txns()) { + assertEquals(TxnState.ABORTED, txnInfo.getState()); + numAborted++; + } + assertEquals(503, numAborted); + } finally { + txnHandler.setTimeout(timeout); + } + + + } + + @Test public void testHeartbeatNoLock() throws Exception { HeartbeatRequest h = new HeartbeatRequest(); h.setLockid(29389839L);