Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1652558&r1=1652557&r2=1652558&view=diff ============================================================================== --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original) +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Sat Jan 17 02:54:40 2015 @@ -77,7 +77,7 @@ public class TxnHandler { static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName()); static private DataSource connPool; - private static Boolean lockLock = new Boolean("true"); // Random object to lock on for the lock + private final static Object lockLock = new Object(); // Random object to lock on for the lock // method /** @@ -87,10 +87,13 @@ public class TxnHandler { protected HiveConf conf; protected DatabaseProduct dbProduct; - // Transaction timeout, in milliseconds. + // (End user) Transaction timeout, in milliseconds. private long timeout; private String identifierQuoteString; // quotes to use for quoting tables, where necessary + private final long retryInterval; + private final int retryLimit; + private int retryNum; // DEADLOCK DETECTION AND HANDLING // A note to developers of this class. ALWAYS access HIVE_LOCKS before TXNS to avoid deadlock @@ -125,113 +128,122 @@ public class TxnHandler { timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS); deadlockCnt = 0; buildJumpTable(); + retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL, TimeUnit.MILLISECONDS); + retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS); + } public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { - // We need to figure out the current transaction number and the list of - // open transactions. To avoid needing a transaction on the underlying - // database we'll look at the current transaction number first. If it - // subsequently shows up in the open list that's ok. - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - Statement stmt = null; try { - stmt = dbConn.createStatement(); - String s = "select ntxn_next - 1 from NEXT_TXN_ID"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + + // We need to figure out the current transaction number and the list of + // open transactions. To avoid needing a transaction on the underlying + // database we'll look at the current transaction number first. If it + // subsequently shows up in the open list that's ok. + Connection dbConn = null; + Statement stmt = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select ntxn_next - 1 from NEXT_TXN_ID"; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction tables not properly " + "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - if (rs.wasNull()) { - throw new MetaException("Transaction tables not properly " + + } + long hwm = rs.getLong(1); + if (rs.wasNull()) { + throw new MetaException("Transaction tables not properly " + "initialized, null record found in next_txn_id"); - } - - List<TxnInfo> txnInfo = new ArrayList<TxnInfo>(); - s = "select txn_id, txn_state, txn_user, txn_host from TXNS"; - LOG.debug("Going to execute query<" + s + ">"); - rs = stmt.executeQuery(s); - while (rs.next()) { - char c = rs.getString(2).charAt(0); - TxnState state; - switch (c) { - case TXN_ABORTED: - state = TxnState.ABORTED; - break; + } - case TXN_OPEN: - state = TxnState.OPEN; - break; + List<TxnInfo> txnInfo = new ArrayList<TxnInfo>(); + s = "select txn_id, txn_state, txn_user, txn_host from TXNS"; + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + char c = rs.getString(2).charAt(0); + TxnState state; + switch (c) { + case TXN_ABORTED: + state = TxnState.ABORTED; + break; + + case TXN_OPEN: + state = TxnState.OPEN; + break; - default: - throw new MetaException("Unexpected transaction state " + c + + default: + throw new MetaException("Unexpected transaction state " + c + " found in txns table"); + } + txnInfo.add(new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4))); } - txnInfo.add(new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4))); - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - return new GetOpenTxnsInfoResponse(hwm, txnInfo); - } catch (SQLException e) { - try { LOG.debug("Going to rollback"); dbConn.rollback(); - } catch (SQLException e1) { - } - throw new MetaException("Unable to select from transaction database, " + return new GetOpenTxnsInfoResponse(hwm, txnInfo); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "getOpenTxnsInfo"); + throw new MetaException("Unable to select from transaction database: " + getMessage(e) + StringUtils.stringifyException(e)); - } finally { - closeStmt(stmt); - closeDbConn(dbConn); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + } catch (RetryException e) { + return getOpenTxnsInfo(); } } public GetOpenTxnsResponse getOpenTxns() throws MetaException { - // We need to figure out the current transaction number and the list of - // open transactions. To avoid needing a transaction on the underlying - // database we'll look at the current transaction number first. If it - // subsequently shows up in the open list that's ok. - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - Statement stmt = null; try { - timeOutTxns(dbConn); - stmt = dbConn.createStatement(); - String s = "select ntxn_next - 1 from NEXT_TXN_ID"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + + // We need to figure out the current transaction number and the list of + // open transactions. To avoid needing a transaction on the underlying + // database we'll look at the current transaction number first. If it + // subsequently shows up in the open list that's ok. + Connection dbConn = null; + Statement stmt = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + timeOutTxns(dbConn); + stmt = dbConn.createStatement(); + String s = "select ntxn_next - 1 from NEXT_TXN_ID"; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction tables not properly " + "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - if (rs.wasNull()) { - throw new MetaException("Transaction tables not properly " + + } + long hwm = rs.getLong(1); + if (rs.wasNull()) { + throw new MetaException("Transaction tables not properly " + "initialized, null record found in next_txn_id"); - } + } - Set<Long> openList = new HashSet<Long>(); - s = "select txn_id from TXNS"; - LOG.debug("Going to execute query<" + s + ">"); - rs = stmt.executeQuery(s); - while (rs.next()) { - openList.add(rs.getLong(1)); - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - return new GetOpenTxnsResponse(hwm, openList); - } catch (SQLException e) { - try { + Set<Long> openList = new HashSet<Long>(); + s = "select txn_id from TXNS"; + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + openList.add(rs.getLong(1)); + } LOG.debug("Going to rollback"); dbConn.rollback(); - } catch (SQLException e1) { - } - throw new MetaException("Unable to select from transaction database, " + return new GetOpenTxnsResponse(hwm, openList); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "getOpenTxns"); + throw new MetaException("Unable to select from transaction database, " + StringUtils.stringifyException(e)); - } finally { - closeStmt(stmt); - closeDbConn(dbConn); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + } catch (RetryException e) { + return getOpenTxns(); } } @@ -259,12 +271,13 @@ public class TxnHandler { public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { int numTxns = rqst.getNum_txns(); try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); // Make sure the user has not requested an insane amount of txns. int maxTxns = HiveConf.getIntVar(conf, - HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH); + HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH); if (numTxns > maxTxns) numTxns = maxTxns; stmt = dbConn.createStatement(); @@ -273,7 +286,7 @@ public class TxnHandler { ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { throw new MetaException("Transaction database not properly " + - "configured, can't find next transaction id."); + "configured, can't find next transaction id."); } long first = rs.getLong(1); s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns); @@ -281,8 +294,8 @@ public class TxnHandler { stmt.executeUpdate(s); long now = getDbTime(dbConn); s = "insert into TXNS (txn_id, txn_state, txn_started, " + - "txn_last_heartbeat, txn_user, txn_host) values (?, 'o', " + now + ", " + - now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')"; + "txn_last_heartbeat, txn_user, txn_host) values (?, 'o', " + now + ", " + + now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')"; LOG.debug("Going to prepare statement <" + s + ">"); PreparedStatement ps = dbConn.prepareStatement(s); List<Long> txnIds = new ArrayList<Long>(numTxns); @@ -296,30 +309,26 @@ public class TxnHandler { dbConn.commit(); return new OpenTxnsResponse(txnIds); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "openTxns"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "openTxns"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { closeStmt(stmt); closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { return openTxns(rqst); - } finally { - deadlockCnt = 0; } } public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException { long txnid = rqst.getTxnid(); try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); List<Long> txnids = new ArrayList<Long>(1); txnids.add(txnid); if (abortTxns(dbConn, txnids) != 1) { @@ -331,31 +340,27 @@ public class TxnHandler { LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "abortTxn"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "abortTxn"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { abortTxn(rqst); - } finally { - deadlockCnt = 0; } } public void commitTxn(CommitTxnRequest rqst) - throws NoSuchTxnException, TxnAbortedException, MetaException { + throws NoSuchTxnException, TxnAbortedException, MetaException { long txnid = rqst.getTxnid(); try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); // Before we do the commit heartbeat the txn. This is slightly odd in that we're going to // commit it, but it does two things. One, it makes sure the transaction is still valid. @@ -367,11 +372,11 @@ public class TxnHandler { // Move the record from txn_components into completed_txn_components so that the compactor // knows where to look to compact. String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " + - "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid; + "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid; LOG.debug("Going to execute insert <" + s + ">"); if (stmt.executeUpdate(s) < 1) { LOG.warn("Expected to move at least one record from txn_components to " + - "completed_txn_components when committing txn!"); + "completed_txn_components when committing txn!"); } // Always access TXN_COMPONENTS before HIVE_LOCKS; @@ -388,80 +393,68 @@ public class TxnHandler { LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "commitTxn"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "commitTxn"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { closeStmt(stmt); closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { commitTxn(rqst); - } finally { - deadlockCnt = 0; } } public LockResponse lock(LockRequest rqst) - throws NoSuchTxnException, TxnAbortedException, MetaException { + throws NoSuchTxnException, TxnAbortedException, MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); return lock(dbConn, rqst, true); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "lock"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "lock"); throw new MetaException("Unable to update transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { return lock(rqst); - } finally { - deadlockCnt = 0; } } public LockResponse lockNoWait(LockRequest rqst) - throws NoSuchTxnException, TxnAbortedException, MetaException { + throws NoSuchTxnException, TxnAbortedException, MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); return lock(dbConn, rqst, false); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "lockNoWait"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "lockNoWait"); throw new MetaException("Unable to update transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { return lockNoWait(rqst); - } finally { - deadlockCnt = 0; } } public LockResponse checkLock(CheckLockRequest rqst) - throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); long extLockId = rqst.getLockid(); // Clean up timed out locks timeOutLocks(dbConn); @@ -474,31 +467,27 @@ public class TxnHandler { if (txnid > 0) heartbeatTxn(dbConn, txnid); return checkLock(dbConn, extLockId, true); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "checkLock"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "checkLock"); throw new MetaException("Unable to update transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { return checkLock(rqst); - } finally { - deadlockCnt = 0; } } public void unlock(UnlockRequest rqst) - throws NoSuchLockException, TxnOpenException, MetaException { + throws NoSuchLockException, TxnOpenException, MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); // Odd as it seems, we need to heartbeat first because this touches the // lock table and assures that our locks our still valid. If they are // not, this will throw an exception and the heartbeat will fail. @@ -512,8 +501,8 @@ public class TxnHandler { LOG.debug("Going to rollback"); dbConn.rollback(); String msg = "Unlocking locks associated with transaction" + - " not permitted. Lockid " + extLockId + " is associated with " + - "transaction " + txnid; + " not permitted. Lockid " + extLockId + " is associated with " + + "transaction " + txnid; LOG.error(msg); throw new TxnOpenException(msg); } @@ -529,97 +518,96 @@ public class TxnHandler { LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "unlock"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "unlock"); throw new MetaException("Unable to update transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeStmt(stmt); closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { unlock(rqst); - } finally { - deadlockCnt = 0; } } public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - ShowLocksResponse rsp = new ShowLocksResponse(); - List<ShowLocksResponseElement> elems = new ArrayList<ShowLocksResponseElement>(); - Statement stmt = null; try { - stmt = dbConn.createStatement(); + Connection dbConn = null; + ShowLocksResponse rsp = new ShowLocksResponse(); + List<ShowLocksResponseElement> elems = new ArrayList<ShowLocksResponseElement>(); + Statement stmt = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); - String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + + String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host from HIVE_LOCKS"; - LOG.debug("Doing to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - while (rs.next()) { - ShowLocksResponseElement e = new ShowLocksResponseElement(); - e.setLockid(rs.getLong(1)); - long txnid = rs.getLong(2); - if (!rs.wasNull()) e.setTxnid(txnid); - e.setDbname(rs.getString(3)); - e.setTablename(rs.getString(4)); - String partition = rs.getString(5); - if (partition != null) e.setPartname(partition); - switch (rs.getString(6).charAt(0)) { - case LOCK_ACQUIRED: e.setState(LockState.ACQUIRED); break; - case LOCK_WAITING: e.setState(LockState.WAITING); break; - default: throw new MetaException("Unknown lock state " + rs.getString(6).charAt(0)); - } - switch (rs.getString(7).charAt(0)) { - case LOCK_SEMI_SHARED: e.setType(LockType.SHARED_WRITE); break; - case LOCK_EXCLUSIVE: e.setType(LockType.EXCLUSIVE); break; - case LOCK_SHARED: e.setType(LockType.SHARED_READ); break; - default: throw new MetaException("Unknown lock type " + rs.getString(6).charAt(0)); - } - e.setLastheartbeat(rs.getLong(8)); - long acquiredAt = rs.getLong(9); - if (!rs.wasNull()) e.setAcquiredat(acquiredAt); - e.setUser(rs.getString(10)); - e.setHostname(rs.getString(11)); - elems.add(e); - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e) { - throw new MetaException("Unable to select from transaction database " + + LOG.debug("Doing to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + while (rs.next()) { + ShowLocksResponseElement e = new ShowLocksResponseElement(); + e.setLockid(rs.getLong(1)); + long txnid = rs.getLong(2); + if (!rs.wasNull()) e.setTxnid(txnid); + e.setDbname(rs.getString(3)); + e.setTablename(rs.getString(4)); + String partition = rs.getString(5); + if (partition != null) e.setPartname(partition); + switch (rs.getString(6).charAt(0)) { + case LOCK_ACQUIRED: e.setState(LockState.ACQUIRED); break; + case LOCK_WAITING: e.setState(LockState.WAITING); break; + default: throw new MetaException("Unknown lock state " + rs.getString(6).charAt(0)); + } + switch (rs.getString(7).charAt(0)) { + case LOCK_SEMI_SHARED: e.setType(LockType.SHARED_WRITE); break; + case LOCK_EXCLUSIVE: e.setType(LockType.EXCLUSIVE); break; + case LOCK_SHARED: e.setType(LockType.SHARED_READ); break; + default: throw new MetaException("Unknown lock type " + rs.getString(6).charAt(0)); + } + e.setLastheartbeat(rs.getLong(8)); + long acquiredAt = rs.getLong(9); + if (!rs.wasNull()) e.setAcquiredat(acquiredAt); + e.setUser(rs.getString(10)); + e.setHostname(rs.getString(11)); + elems.add(e); + } + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e) { + checkRetryable(dbConn, e, "showLocks"); + throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); - } finally { - closeStmt(stmt); - closeDbConn(dbConn); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + rsp.setLocks(elems); + return rsp; + } catch (RetryException e) { + return showLocks(rqst); } - rsp.setLocks(elems); - return rsp; } public void heartbeat(HeartbeatRequest ids) - throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); heartbeatLock(dbConn, ids.getLockid()); heartbeatTxn(dbConn, ids.getTxnid()); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "heartbeat"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "heartbeat"); throw new MetaException("Unable to select from transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { heartbeat(ids); } finally { deadlockCnt = 0; @@ -627,15 +615,16 @@ public class TxnHandler { } public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst) - throws MetaException { + throws MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; HeartbeatTxnRangeResponse rsp = new HeartbeatTxnRangeResponse(); Set<Long> nosuch = new HashSet<Long>(); Set<Long> aborted = new HashSet<Long>(); rsp.setNosuch(nosuch); rsp.setAborted(aborted); try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) { try { heartbeatTxn(dbConn, txn); @@ -647,18 +636,15 @@ public class TxnHandler { } return rsp; } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "heartbeatTxnRange"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "heartbeatTxnRange"); throw new MetaException("Unable to select from transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { return heartbeatTxnRange(rqst); } } @@ -666,9 +652,10 @@ public class TxnHandler { public void compact(CompactionRequest rqst) throws MetaException { // Put a compaction request in the queue. try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); // Get the id for the next entry in the queue @@ -679,7 +666,7 @@ public class TxnHandler { LOG.debug("Going to rollback"); dbConn.rollback(); throw new MetaException("Transaction tables not properly initiated, " + - "no record found in next_compaction_queue_id"); + "no record found in next_compaction_queue_id"); } long id = rs.getLong(1); s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1); @@ -687,7 +674,7 @@ public class TxnHandler { stmt.executeUpdate(s); StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " + - "cq_table, "); + "cq_table, "); String partName = rqst.getPartitionname(); if (partName != null) buf.append("cq_partition, "); buf.append("cq_state, cq_type"); @@ -730,71 +717,69 @@ public class TxnHandler { LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "compact"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "compact"); throw new MetaException("Unable to select from transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeStmt(stmt); closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { compact(rqst); - } finally { - deadlockCnt = 0; } } public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException { ShowCompactResponse response = new ShowCompactResponse(new ArrayList<ShowCompactResponseElement>()); - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + Connection dbConn = null; Statement stmt = null; try { - stmt = dbConn.createStatement(); - String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " + - "cq_start, cq_run_as from COMPACTION_QUEUE"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - while (rs.next()) { - ShowCompactResponseElement e = new ShowCompactResponseElement(); - e.setDbname(rs.getString(1)); - e.setTablename(rs.getString(2)); - e.setPartitionname(rs.getString(3)); - switch (rs.getString(4).charAt(0)) { - case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break; - case WORKING_STATE: e.setState(WORKING_RESPONSE); break; - case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break; - default: throw new MetaException("Unexpected compaction state " + rs.getString(4)); - } - switch (rs.getString(5).charAt(0)) { - case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break; - case MINOR_TYPE: e.setType(CompactionType.MINOR); break; - default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); - } - e.setWorkerid(rs.getString(6)); - e.setStart(rs.getLong(7)); - e.setRunAs(rs.getString(8)); - response.addToCompacts(e); - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e) { - LOG.debug("Going to rollback"); try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " + + "cq_start, cq_run_as from COMPACTION_QUEUE"; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + while (rs.next()) { + ShowCompactResponseElement e = new ShowCompactResponseElement(); + e.setDbname(rs.getString(1)); + e.setTablename(rs.getString(2)); + e.setPartitionname(rs.getString(3)); + switch (rs.getString(4).charAt(0)) { + case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break; + case WORKING_STATE: e.setState(WORKING_RESPONSE); break; + case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break; + default: throw new MetaException("Unexpected compaction state " + rs.getString(4)); + } + switch (rs.getString(5).charAt(0)) { + case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break; + case MINOR_TYPE: e.setType(CompactionType.MINOR); break; + default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); + } + e.setWorkerid(rs.getString(6)); + e.setStart(rs.getLong(7)); + e.setRunAs(rs.getString(8)); + response.addToCompacts(e); + } + LOG.debug("Going to rollback"); dbConn.rollback(); - } catch (SQLException e1) { - } - throw new MetaException("Unable to select from transaction database " + + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "showCompact"); + throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); - } finally { - closeStmt(stmt); - closeDbConn(dbConn); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + return response; + } catch (RetryException e) { + return showCompact(rqst); } - return response; } /** @@ -828,7 +813,7 @@ public class TxnHandler { return previous_timeout; } - protected class DeadlockException extends Exception { + protected class RetryException extends Exception { } @@ -839,26 +824,28 @@ public class TxnHandler { * @return db connection * @throws MetaException if the connection cannot be obtained */ - protected Connection getDbConn(int isolationLevel) throws MetaException { + protected Connection getDbConn(int isolationLevel) throws SQLException { + Connection dbConn = connPool.getConnection(); + dbConn.setAutoCommit(false); + dbConn.setTransactionIsolation(isolationLevel); + return dbConn; + } + + void rollbackDBConn(Connection dbConn) { try { - Connection dbConn = connPool.getConnection(); - dbConn.setAutoCommit(false); - dbConn.setTransactionIsolation(isolationLevel); - return dbConn; + if (dbConn != null) dbConn.rollback(); } catch (SQLException e) { - String msg = "Unable to get jdbc connection from pool, " + e.getMessage(); - throw new MetaException(msg); + LOG.warn("Failed to rollback db connection " + getMessage(e)); } } - protected void closeDbConn(Connection dbConn) { try { if (dbConn != null) dbConn.close(); } catch (SQLException e) { - LOG.warn("Failed to close db connection " + e.getMessage()); + LOG.warn("Failed to close db connection " + getMessage(e)); } } - + /** * Close statement instance. * @param stmt statement instance. @@ -867,7 +854,7 @@ public class TxnHandler { try { if (stmt != null) stmt.close(); } catch (SQLException e) { - LOG.warn("Failed to close statement " + e.getMessage()); + LOG.warn("Failed to close statement " + getMessage(e)); } } @@ -882,7 +869,7 @@ public class TxnHandler { } } catch(SQLException ex) { - LOG.warn("Failed to close statement " + ex.getMessage()); + LOG.warn("Failed to close statement " + getMessage(ex)); } } @@ -895,18 +882,18 @@ public class TxnHandler { closeDbConn(dbConn); } /** - * Determine if an exception was a deadlock. Unfortunately there is no standard way to do + * Determine if an exception was such that it makse sense to retry. Unfortunately there is no standard way to do * this, so we have to inspect the error messages and catch the telltale signs for each * different database. * @param conn database connection * @param e exception that was thrown. * @param caller name of the method calling this - * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.DeadlockException when deadlock + * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.RetryException when deadlock * detected and retry count has not been exceeded. */ - protected void detectDeadlock(Connection conn, + protected void checkRetryable(Connection conn, SQLException e, - String caller) throws DeadlockException, MetaException { + String caller) throws RetryException, MetaException { // If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected() // to test these changes. @@ -919,19 +906,41 @@ public class TxnHandler { determineDatabaseProduct(conn); } if (e instanceof SQLTransactionRollbackException || - ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES || - dbProduct == DatabaseProduct.SQLSERVER) && e.getSQLState().equals("40001")) || - (dbProduct == DatabaseProduct.POSTGRES && e.getSQLState().equals("40P01")) || - (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected") - || e.getMessage().contains("can't serialize access for this transaction")))) { + ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES || + dbProduct == DatabaseProduct.SQLSERVER) && e.getSQLState().equals("40001")) || + (dbProduct == DatabaseProduct.POSTGRES && e.getSQLState().equals("40P01")) || + (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected") + || e.getMessage().contains("can't serialize access for this transaction")))) { if (deadlockCnt++ < ALLOWED_REPEATED_DEADLOCKS) { LOG.warn("Deadlock detected in " + caller + ", trying again."); - throw new DeadlockException(); + throw new RetryException(); } else { LOG.error("Too many repeated deadlocks in " + caller + ", giving up."); deadlockCnt = 0; } } + else if(isRetryable(e)) { + //in MSSQL this means Communication Link Failure + if(retryNum++ < retryLimit) { + try { + Thread.sleep(retryInterval); + } + catch(InterruptedException ex) { + // + } + LOG.warn("Retryable error detected in " + caller + ", trying again: " + getMessage(e)); + throw new RetryException(); + } + else { + LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e)); + retryNum = 0; + } + } + else { + //if here, we got something that will propagate the error (rather than retry), so reset counters + deadlockCnt = 0; + retryNum = 0; + } } /** @@ -1073,10 +1082,10 @@ public class TxnHandler { @Override public String toString() { return "extLockId:" + Long.toString(extLockId) + " intLockId:" + - intLockId + " txnId:" + Long.toString - (txnId) + " db:" + db + " table:" + table + " partition:" + - partition + " state:" + (state == null ? "null" : state.toString()) - + " type:" + (type == null ? "null" : type.toString()); + intLockId + " txnId:" + Long.toString + (txnId) + " db:" + db + " table:" + table + " partition:" + + partition + " state:" + (state == null ? "null" : state.toString()) + + " type:" + (type == null ? "null" : type.toString()); } } @@ -1088,11 +1097,11 @@ public class TxnHandler { public int compare(LockInfo info1, LockInfo info2) { // We sort by state (acquired vs waiting) and then by extLockId. if (info1.state == LockState.ACQUIRED && - info2.state != LockState .ACQUIRED) { + info2.state != LockState .ACQUIRED) { return -1; } if (info1.state != LockState.ACQUIRED && - info2.state == LockState .ACQUIRED) { + info2.state == LockState .ACQUIRED) { return 1; } if (info1.extLockId < info2.extLockId) { @@ -1124,7 +1133,7 @@ public class TxnHandler { private void checkQFileTestHack() { boolean hackOn = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST) || - HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST); + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST); if (hackOn) { LOG.info("Hacking in canned values for transaction manager"); // Set up the transaction/locking db in the derby metastore @@ -1135,7 +1144,7 @@ public class TxnHandler { // We may have already created the tables and thus don't need to redo it. if (!e.getMessage().contains("already exists")) { throw new RuntimeException("Unable to set up transaction database for" + - " testing: " + e.getMessage()); + " testing: " + e.getMessage()); } } } @@ -1153,7 +1162,7 @@ public class TxnHandler { int updateCnt = 0; try { stmt = dbConn.createStatement(); - + // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS StringBuilder buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in ("); boolean first = true; @@ -1165,7 +1174,7 @@ public class TxnHandler { buf.append(')'); LOG.debug("Going to execute update <" + buf.toString() + ">"); stmt.executeUpdate(buf.toString()); - + buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id in ("); first = true; for (Long id : txnids) { @@ -1176,7 +1185,7 @@ public class TxnHandler { buf.append(')'); LOG.debug("Going to execute update <" + buf.toString() + ">"); updateCnt = stmt.executeUpdate(buf.toString()); - + LOG.debug("Going to commit"); dbConn.commit(); } finally { @@ -1202,7 +1211,7 @@ public class TxnHandler { * @throws TxnAbortedException */ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) - throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException { + throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException { // We want to minimize the number of concurrent lock requests being issued. If we do not we // get a large number of deadlocks in the database, since this method has to both clean // timedout locks and insert new locks. This synchronization barrier will not eliminiate all @@ -1227,7 +1236,7 @@ public class TxnHandler { LOG.debug("Going to rollback"); dbConn.rollback(); throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_lock_id"); + "initialized, no record found in next_lock_id"); } long extLockId = rs.getLong(1); s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); @@ -1252,8 +1261,8 @@ public class TxnHandler { s = "insert into TXN_COMPONENTS " + "(tc_txnid, tc_database, tc_table, tc_partition) " + "values (" + txnid + ", '" + dbName + "', " + - (tblName == null ? "null" : "'" + tblName + "'") + ", " + - (partName == null ? "null" : "'" + partName + "'") + ")"; + (tblName == null ? "null" : "'" + tblName + "'") + ", " + + (partName == null ? "null" : "'" + partName + "'") + ")"; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); } @@ -1275,13 +1284,13 @@ public class TxnHandler { long now = getDbTime(dbConn); s = "insert into HIVE_LOCKS " + " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + - " values (" + extLockId + ", " + - + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" + - dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" ) - + ", " + (partName == null ? "null" : "'" + partName + "'") + - ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + now + ", '" + - rqst.getUser() + "', '" + rqst.getHostname() + "')"; + "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + + " values (" + extLockId + ", " + + + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" + + dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" ) + + ", " + (partName == null ? "null" : "'" + partName + "'") + + ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + now + ", '" + + rqst.getUser() + "', '" + rqst.getHostname() + "')"; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); } @@ -1305,7 +1314,7 @@ public class TxnHandler { private LockResponse checkLock(Connection dbConn, long extLockId, boolean alwaysCommit) - throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException { + throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException { List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId); LockResponse response = new LockResponse(); response.setLockid(extLockId); @@ -1313,8 +1322,8 @@ public class TxnHandler { LOG.debug("Setting savepoint"); Savepoint save = dbConn.setSavepoint(); StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + - "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type from HIVE_LOCKS where hl_db in ("); + "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + + "hl_lock_type from HIVE_LOCKS where hl_db in ("); Set<String> strings = new HashSet<String>(locksBeingChecked.size()); for (LockInfo info : locksBeingChecked) { @@ -1430,7 +1439,7 @@ public class TxnHandler { // lock the whole database and we need to check it. Otherwise, // check if they are operating on the same table, if not, move on. if (locks[index].table != null && locks[i].table != null - && !locks[index].table.equals(locks[i].table)) { + && !locks[index].table.equals(locks[i].table)) { continue; } @@ -1438,30 +1447,30 @@ public class TxnHandler { // lock the whole table and we need to check it. Otherwise, // check if they are operating on the same partition, if not, move on. if (locks[index].partition != null && locks[i].partition != null - && !locks[index].partition.equals(locks[i].partition)) { + && !locks[index].partition.equals(locks[i].partition)) { continue; } // We've found something that matches what we're trying to lock, // so figure out if we can lock it too. switch (jumpTable.get(locks[index].type).get(locks[i].type).get - (locks[i].state)) { - case ACQUIRE: - acquire(dbConn, stmt, extLockId, info.intLockId); - acquired = true; - break; - case WAIT: - wait(dbConn, save); - if (alwaysCommit) { - // In the case where lockNoWait has been called we don't want to commit because - // it's going to roll everything back. In every other case we want to commit here. - LOG.debug("Going to commit"); - dbConn.commit(); - } - response.setState(LockState.WAITING); - return response; - case KEEP_LOOKING: - continue; + (locks[i].state)) { + case ACQUIRE: + acquire(dbConn, stmt, extLockId, info.intLockId); + acquired = true; + break; + case WAIT: + wait(dbConn, save); + if (alwaysCommit) { + // In the case where lockNoWait has been called we don't want to commit because + // it's going to roll everything back. In every other case we want to commit here. + LOG.debug("Going to commit"); + dbConn.commit(); + } + response.setState(LockState.WAITING); + return response; + case KEEP_LOOKING: + continue; } if (acquired) break; // We've acquired this lock component, // so get out of the loop and look at the next component. @@ -1494,18 +1503,18 @@ public class TxnHandler { } private void acquire(Connection dbConn, Statement stmt, long extLockId, long intLockId) - throws SQLException, NoSuchLockException, MetaException { + throws SQLException, NoSuchLockException, MetaException { long now = getDbTime(dbConn); String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " + - "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " + - extLockId + " and hl_lock_int_id = " + intLockId; + "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " + + extLockId + " and hl_lock_int_id = " + intLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { LOG.debug("Going to rollback"); dbConn.rollback(); throw new NoSuchLockException("No such lock: (" + extLockId + "," + - + intLockId + ")"); + + intLockId + ")"); } // We update the database, but we don't commit because there may be other // locks together with this, and we only want to acquire one if we can @@ -1514,7 +1523,7 @@ public class TxnHandler { // Heartbeats on the lock table. This commits, so do not enter it with any state private void heartbeatLock(Connection dbConn, long extLockId) - throws NoSuchLockException, SQLException, MetaException { + throws NoSuchLockException, SQLException, MetaException { // If the lock id is 0, then there are no locks in this heartbeat if (extLockId == 0) return; Statement stmt = null; @@ -1523,7 +1532,7 @@ public class TxnHandler { long now = getDbTime(dbConn); String s = "update HIVE_LOCKS set hl_last_heartbeat = " + - now + " where hl_lock_ext_id = " + extLockId; + now + " where hl_lock_ext_id = " + extLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -1540,7 +1549,7 @@ public class TxnHandler { // Heartbeats on the txn table. This commits, so do not enter it with any state private void heartbeatTxn(Connection dbConn, long txnid) - throws NoSuchTxnException, TxnAbortedException, SQLException, MetaException { + throws NoSuchTxnException, TxnAbortedException, SQLException, MetaException { // If the txnid is 0, then there are no transactions in this heartbeat if (txnid == 0) return; Statement stmt = null; @@ -1560,10 +1569,10 @@ public class TxnHandler { LOG.debug("Going to rollback"); dbConn.rollback(); throw new TxnAbortedException("Transaction " + txnid + - " already aborted"); + " already aborted"); } s = "update TXNS set txn_last_heartbeat = " + now + - " where txn_id = " + txnid; + " where txn_id = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); LOG.debug("Going to commit"); @@ -1575,17 +1584,17 @@ public class TxnHandler { // NEVER call this function without first calling heartbeat(long, long) private long getTxnIdFromLockId(Connection dbConn, long extLockId) - throws NoSuchLockException, MetaException, SQLException { + throws NoSuchLockException, MetaException, SQLException { Statement stmt = null; try { stmt = dbConn.createStatement(); String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " + - extLockId; + extLockId; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { throw new MetaException("This should never happen! We already " + - "checked the lock existed but now we can't find it!"); + "checked the lock existed but now we can't find it!"); } long txnid = rs.getLong(1); LOG.debug("Return txnid " + (rs.wasNull() ? -1 : txnid)); @@ -1597,13 +1606,13 @@ public class TxnHandler { // NEVER call this function without first calling heartbeat(long, long) private List<LockInfo> getLockInfoFromLockId(Connection dbConn, long extLockId) - throws NoSuchLockException, MetaException, SQLException { + throws NoSuchLockException, MetaException, SQLException { Statement stmt = null; try { stmt = dbConn.createStatement(); String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " + - "hl_lock_ext_id = " + extLockId; + "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " + + "hl_lock_ext_id = " + extLockId; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); boolean sawAtLeastOne = false; @@ -1614,7 +1623,7 @@ public class TxnHandler { } if (!sawAtLeastOne) { throw new MetaException("This should never happen! We already " + - "checked the lock existed but now we can't find it!"); + "checked the lock existed but now we can't find it!"); } return ourLockInfo; } finally { @@ -1632,7 +1641,7 @@ public class TxnHandler { stmt = dbConn.createStatement(); // Remove any timed out locks from the table. String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + - (now - timeout); + (now - timeout); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); LOG.debug("Going to commit"); @@ -1652,7 +1661,7 @@ public class TxnHandler { stmt = dbConn.createStatement(); // Abort any timed out locks from the table. String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN + - "' and txn_last_heartbeat < " + (now - timeout); + "' and txn_last_heartbeat < " + (now - timeout); LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); List<Long> deadTxns = new ArrayList<Long>(); @@ -1675,12 +1684,12 @@ public class TxnHandler { String passwd; try { passwd = ShimLoader.getHadoopShims().getPassword(conf, - HiveConf.ConfVars.METASTOREPWD.varname); + HiveConf.ConfVars.METASTOREPWD.varname); } catch (IOException err) { throw new SQLException("Error getting metastore password", err); } String connectionPooler = HiveConf.getVar(conf, - HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_TYPE).toLowerCase(); + HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_TYPE).toLowerCase(); if ("bonecp".equals(connectionPooler)) { BoneCPConfig config = new BoneCPConfig(); @@ -1696,22 +1705,22 @@ public class TxnHandler { // This doesn't get used, but it's still necessary, see // http://svn.apache.org/viewvc/commons/proper/dbcp/branches/DBCP_1_4_x_BRANCH/doc/ManualPoolingDataSourceExample.java?view=markup PoolableConnectionFactory poolConnFactory = - new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true); + new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true); connPool = new PoolingDataSource(objectPool); } else { throw new RuntimeException("Unknown JDBC connection pooling " + connectionPooler); } } - private static synchronized void buildJumpTable() { + private static synchronized void buildJumpTable() { if (jumpTable != null) return; jumpTable = - new HashMap<LockType, Map<LockType, Map<LockState, LockAction>>>(3); + new HashMap<LockType, Map<LockType, Map<LockState, LockAction>>>(3); // SR: Lock we are trying to acquire is shared read Map<LockType, Map<LockState, LockAction>> m = - new HashMap<LockType, Map<LockState, LockAction>>(3); + new HashMap<LockType, Map<LockState, LockAction>>(3); jumpTable.put(LockType.SHARED_READ, m); // SR.SR: Lock we are examining is shared read @@ -1743,7 +1752,7 @@ public class TxnHandler { // that something is blocking it that would not block a read. m2.put(LockState.WAITING, LockAction.KEEP_LOOKING); - // SR.E: Lock we are examining is exclusive + // SR.E: Lock we are examining is exclusive m2 = new HashMap<LockState, LockAction>(2); m.put(LockType.EXCLUSIVE, m2); @@ -1777,7 +1786,7 @@ public class TxnHandler { m2.put(LockState.ACQUIRED, LockAction.WAIT); m2.put(LockState.WAITING, LockAction.WAIT); - // SW.E: Lock we are examining is exclusive + // SW.E: Lock we are examining is exclusive m2 = new HashMap<LockState, LockAction>(2); m.put(LockType.EXCLUSIVE, m2); @@ -1805,7 +1814,7 @@ public class TxnHandler { m2.put(LockState.ACQUIRED, LockAction.WAIT); m2.put(LockState.WAITING, LockAction.WAIT); - // E.E: Lock we are examining is exclusive + // E.E: Lock we are examining is exclusive m2 = new HashMap<LockState, LockAction>(2); m.put(LockType.EXCLUSIVE, m2); @@ -1813,4 +1822,20 @@ public class TxnHandler { m2.put(LockState.ACQUIRED, LockAction.WAIT); m2.put(LockState.WAITING, LockAction.WAIT); } + /** + * Returns true if {@code ex} should be retried + */ + private static boolean isRetryable(Exception ex) { + if(ex instanceof SQLException) { + SQLException sqlException = (SQLException)ex; + if("08S01".equalsIgnoreCase(sqlException.getSQLState())) { + //in MSSQL this means Communication Link Failure + return true; + } + } + return false; + } + private static String getMessage(SQLException ex) { + return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")"; + } }
Modified: hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java?rev=1652558&r1=1652557&r2=1652558&view=diff ============================================================================== --- hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java (original) +++ hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java Sat Jan 17 02:54:40 2015 @@ -1124,11 +1124,11 @@ public class TestTxnHandler { LOG.debug("no exception, no deadlock"); } catch (SQLException e) { try { - txnHandler.detectDeadlock(conn1, e, "thread t1"); + txnHandler.checkRetryable(conn1, e, "thread t1"); LOG.debug("Got an exception, but not a deadlock, SQLState is " + e.getSQLState() + " class of exception is " + e.getClass().getName() + " msg is <" + e.getMessage() + ">"); - } catch (TxnHandler.DeadlockException de) { + } catch (TxnHandler.RetryException de) { LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + "exception is " + e.getClass().getName() + " msg is <" + e .getMessage() + ">"); @@ -1154,11 +1154,11 @@ public class TestTxnHandler { LOG.debug("no exception, no deadlock"); } catch (SQLException e) { try { - txnHandler.detectDeadlock(conn2, e, "thread t2"); + txnHandler.checkRetryable(conn2, e, "thread t2"); LOG.debug("Got an exception, but not a deadlock, SQLState is " + e.getSQLState() + " class of exception is " + e.getClass().getName() + " msg is <" + e.getMessage() + ">"); - } catch (TxnHandler.DeadlockException de) { + } catch (TxnHandler.RetryException de) { LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + "exception is " + e.getClass().getName() + " msg is <" + e .getMessage() + ">");