Repository: hive
Updated Branches:
  refs/heads/master 80fb89131 -> 4b444082f


HIVE-10521 - TxnHandler.timeOutTxns only times out some of the expired 
transactions (Alan Gates via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4b444082
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4b444082
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4b444082

Branch: refs/heads/master
Commit: 4b444082fcae9eb8ea60ec160723a0337ead1852
Parents: 80fb891
Author: Eugene Koifman <ekoif...@hortonworks.com>
Authored: Wed May 6 19:36:48 2015 -0700
Committer: Eugene Koifman <ekoif...@hortonworks.com>
Committed: Wed May 6 19:36:48 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 35 ++++++++++++------
 .../hive/metastore/txn/TestTxnHandler.java      | 39 +++++++++++++++-----
 2 files changed, 53 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4b444082/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 704c3ed..7c3b55c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -75,6 +75,7 @@ public class TxnHandler {
   static final protected char LOCK_SEMI_SHARED = 'w';
 
   static final private int ALLOWED_REPEATED_DEADLOCKS = 10;
+  static final private int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 100;
   static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName());
 
   static private DataSource connPool;
@@ -130,7 +131,8 @@ public class TxnHandler {
     timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 
TimeUnit.MILLISECONDS);
     deadlockCnt = 0;
     buildJumpTable();
-    retryInterval = HiveConf.getTimeVar(conf, 
HiveConf.ConfVars.HMSHANDLERINTERVAL, TimeUnit.MILLISECONDS);
+    retryInterval = HiveConf.getTimeVar(conf, 
HiveConf.ConfVars.HMSHANDLERINTERVAL,
+        TimeUnit.MILLISECONDS);
     retryLimit = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.HMSHANDLERATTEMPTS);
     deadlockRetryInterval = retryInterval / 10;
 
@@ -334,9 +336,7 @@ public class TxnHandler {
       Connection dbConn = null;
       try {
         dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
-        List<Long> txnids = new ArrayList<Long>(1);
-        txnids.add(txnid);
-        if (abortTxns(dbConn, txnids) != 1) {
+        if (abortTxns(dbConn, Collections.singletonList(txnid)) != 1) {
           LOG.debug("Going to rollback");
           dbConn.rollback();
           throw new NoSuchTxnException("No such transaction: " + txnid);
@@ -1321,8 +1321,6 @@ public class TxnHandler {
       LOG.debug("Going to execute update <" + buf.toString() + ">");
       updateCnt = stmt.executeUpdate(buf.toString());
 
-      LOG.debug("Going to commit");
-      dbConn.commit();
     } finally {
       closeStmt(stmt);
     }
@@ -1818,10 +1816,10 @@ public class TxnHandler {
     }
   }
 
-  // Abort timed out transactions.  This calls abortTxn(), which does a commit,
+  // Abort timed out transactions.  This does a commit,
   // and thus should be done before any calls to heartbeat that will leave
   // open transactions on the underlying database.
-  private void timeOutTxns(Connection dbConn) throws SQLException, 
MetaException {
+  private void timeOutTxns(Connection dbConn) throws SQLException, 
MetaException, RetryException {
     long now = getDbTime(dbConn);
     Statement stmt = null;
     try {
@@ -1834,10 +1832,23 @@ public class TxnHandler {
       List<Long> deadTxns = new ArrayList<Long>();
       // Limit the number of timed out transactions we do in one pass to keep 
from generating a
       // huge delete statement
-      for (int i = 0; i < 20 && rs.next(); i++) deadTxns.add(rs.getLong(1));
-      // We don't care whether all of the transactions get deleted or not,
-      // if some didn't it most likely means someone else deleted them in the 
interum
-      if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns);
+      do {
+        deadTxns.clear();
+        for (int i = 0; i <  TIMED_OUT_TXN_ABORT_BATCH_SIZE && rs.next(); i++) 
{
+          deadTxns.add(rs.getLong(1));
+        }
+        // We don't care whether all of the transactions get deleted or not,
+        // if some didn't it most likely means someone else deleted them in 
the interum
+        if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns);
+      } while (deadTxns.size() > 0);
+      LOG.debug("Going to commit");
+      dbConn.commit();
+    } catch (SQLException e) {
+      LOG.debug("Going to rollback");
+      rollbackDBConn(dbConn);
+      checkRetryable(dbConn, e, "abortTxn");
+      throw new MetaException("Unable to update transaction database "
+        + StringUtils.stringifyException(e));
     } finally {
       closeStmt(stmt);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/4b444082/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java 
b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index d4266e1..f478184 100644
--- 
a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ 
b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -937,16 +937,16 @@ public class TestTxnHandler {
   @Test
   public void testLockTimeout() throws Exception {
     long timeout = txnHandler.setTimeout(1);
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, 
"mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-    Thread.currentThread().sleep(10);
     try {
+      LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, 
"mydb");
+      comp.setTablename("mytable");
+      comp.setPartitionname("mypartition");
+      List<LockComponent> components = new ArrayList<LockComponent>(1);
+      components.add(comp);
+      LockRequest req = new LockRequest(components, "me", "localhost");
+      LockResponse res = txnHandler.lock(req);
+      assertTrue(res.getState() == LockState.ACQUIRED);
+      Thread.currentThread().sleep(10);
       txnHandler.checkLock(new CheckLockRequest(res.getLockid()));
       fail("Told there was a lock, when it should have timed out.");
     } catch (NoSuchLockException e) {
@@ -956,6 +956,27 @@ public class TestTxnHandler {
   }
 
   @Test
+  public void testRecoverManyTimeouts() throws Exception {
+    long timeout = txnHandler.setTimeout(1);
+    try {
+      txnHandler.openTxns(new OpenTxnRequest(503, "me", "localhost"));
+      Thread.currentThread().sleep(10);
+      txnHandler.getOpenTxns();
+      GetOpenTxnsInfoResponse rsp = txnHandler.getOpenTxnsInfo();
+      int numAborted = 0;
+      for (TxnInfo txnInfo : rsp.getOpen_txns()) {
+        assertEquals(TxnState.ABORTED, txnInfo.getState());
+        numAborted++;
+      }
+      assertEquals(503, numAborted);
+    } finally {
+      txnHandler.setTimeout(timeout);
+    }
+
+
+  }
+
+  @Test
   public void testHeartbeatNoLock() throws Exception {
     HeartbeatRequest h = new HeartbeatRequest();
     h.setLockid(29389839L);

Reply via email to