Author: ekoifman Date: Sat Jan 24 01:05:12 2015 New Revision: 1654442 URL: http://svn.apache.org/r1654442 Log: HIVE-9390 Enhance retry logic wrt DB access in TxnHandler
Modified: hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java hive/branches/branch-0.14/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java Modified: hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1654442&r1=1654441&r2=1654442&view=diff ============================================================================== --- hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original) +++ hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Sat Jan 24 01:05:12 2015 @@ -5318,126 +5318,74 @@ public class HiveMetaStore extends Thrif // Transaction and locking methods @Override public GetOpenTxnsResponse get_open_txns() throws TException { - try { - return getTxnHandler().getOpenTxns(); - } catch (MetaException e) { - throw new TException(e); - } + return getTxnHandler().getOpenTxns(); } // Transaction and locking methods @Override public GetOpenTxnsInfoResponse get_open_txns_info() throws TException { - try { - return getTxnHandler().getOpenTxnsInfo(); - } catch (MetaException e) { - throw new TException(e); - } + return getTxnHandler().getOpenTxnsInfo(); } @Override public OpenTxnsResponse open_txns(OpenTxnRequest rqst) throws TException { - try { - return getTxnHandler().openTxns(rqst); - } catch (MetaException e) { - throw new TException(e); - } + return getTxnHandler().openTxns(rqst); } @Override public void abort_txn(AbortTxnRequest rqst) throws NoSuchTxnException, TException { - try { - getTxnHandler().abortTxn(rqst); - } catch (MetaException e) { - throw new TException(e); - } + getTxnHandler().abortTxn(rqst); } @Override public void commit_txn(CommitTxnRequest rqst) throws NoSuchTxnException, TxnAbortedException, TException { - try { - getTxnHandler().commitTxn(rqst); - } catch (MetaException e) { - throw new TException(e); - } + getTxnHandler().commitTxn(rqst); } @Override public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, TException { - try { - return getTxnHandler().lock(rqst); - } catch (MetaException e) { - throw new TException(e); - } + return getTxnHandler().lock(rqst); } @Override public LockResponse check_lock(CheckLockRequest rqst) throws NoSuchTxnException, TxnAbortedException, NoSuchLockException, TException { - try { - return getTxnHandler().checkLock(rqst); - } catch (MetaException e) { - throw new TException(e); - } + return getTxnHandler().checkLock(rqst); } @Override public void unlock(UnlockRequest rqst) throws NoSuchLockException, TxnOpenException, TException { - try { - getTxnHandler().unlock(rqst); - } catch (MetaException e) { - throw new TException(e); - } + getTxnHandler().unlock(rqst); } @Override public ShowLocksResponse show_locks(ShowLocksRequest rqst) throws TException { - try { - return getTxnHandler().showLocks(rqst); - } catch (MetaException e) { - throw new TException(e); - } + return getTxnHandler().showLocks(rqst); } @Override public void heartbeat(HeartbeatRequest ids) throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, TException { - try { - getTxnHandler().heartbeat(ids); - } catch (MetaException e) { - throw new TException(e); - } + getTxnHandler().heartbeat(ids); } @Override public HeartbeatTxnRangeResponse heartbeat_txn_range(HeartbeatTxnRangeRequest rqst) throws TException { - try { - return getTxnHandler().heartbeatTxnRange(rqst); - } catch (MetaException e) { - throw new TException(e); - } + return getTxnHandler().heartbeatTxnRange(rqst); } @Override public void compact(CompactionRequest rqst) throws TException { - try { - getTxnHandler().compact(rqst); - } catch (MetaException e) { - throw new TException(e); - } + getTxnHandler().compact(rqst); } @Override public ShowCompactResponse show_compact(ShowCompactRequest rqst) throws TException { - try { - return getTxnHandler().showCompact(rqst); - } catch (MetaException e) { - throw new TException(e); - } + return getTxnHandler().showCompact(rqst); } @Override Modified: hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1654442&r1=1654441&r2=1654442&view=diff ============================================================================== --- hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (original) +++ hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java Sat Jan 24 01:05:12 2015 @@ -52,51 +52,58 @@ public class CompactionTxnHandler extend * or runAs set since these are only potential compactions not actual ones. */ public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException { - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + Connection dbConn = null; Set<CompactionInfo> response = new HashSet<CompactionInfo>(); Statement stmt = null; try { - stmt = dbConn.createStatement(); - // Check for completed transactions - String s = "select distinct ctc_database, ctc_table, " + + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + // Check for completed transactions + String s = "select distinct ctc_database, ctc_table, " + "ctc_partition from COMPLETED_TXN_COMPONENTS"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - while (rs.next()) { - CompactionInfo info = new CompactionInfo(); - info.dbname = rs.getString(1); - info.tableName = rs.getString(2); - info.partName = rs.getString(3); - response.add(info); - } + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + while (rs.next()) { + CompactionInfo info = new CompactionInfo(); + info.dbname = rs.getString(1); + info.tableName = rs.getString(2); + info.partName = rs.getString(3); + response.add(info); + } - // Check for aborted txns - s = "select tc_database, tc_table, tc_partition " + + // Check for aborted txns + s = "select tc_database, tc_table, tc_partition " + "from TXNS, TXN_COMPONENTS " + "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " + "group by tc_database, tc_table, tc_partition " + "having count(*) > " + maxAborted; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - while (rs.next()) { - CompactionInfo info = new CompactionInfo(); - info.dbname = rs.getString(1); - info.tableName = rs.getString(2); - info.partName = rs.getString(3); - info.tooManyAborts = true; - response.add(info); - } + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + CompactionInfo info = new CompactionInfo(); + info.dbname = rs.getString(1); + info.tableName = rs.getString(2); + info.partName = rs.getString(3); + info.tooManyAborts = true; + response.add(info); + } - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e) { - LOG.error("Unable to connect to transaction database " + e.getMessage()); - } finally { - closeDbConn(dbConn); - closeStmt(stmt); + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e) { + LOG.error("Unable to connect to transaction database " + e.getMessage()); + checkRetryable(dbConn, e, "findPotentialCompactions"); + } finally { + closeDbConn(dbConn); + closeStmt(stmt); + } + return response; + } + catch (RetryException e) { + return findPotentialCompactions(maxAborted); } - return response; } /** @@ -107,35 +114,31 @@ public class CompactionTxnHandler extend */ public void setRunAs(long cq_id, String user) throws MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { - stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; - LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) != 1) { - LOG.error("Unable to update compaction record"); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.error("Unable to update compaction queue, " + e.getMessage()); - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "setRunAs"); - } finally { - closeDbConn(dbConn); - closeStmt(stmt); - } - } catch (DeadlockException e) { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + stmt = dbConn.createStatement(); + String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; + LOG.debug("Going to execute update <" + s + ">"); + if (stmt.executeUpdate(s) != 1) { + LOG.error("Unable to update compaction record"); + LOG.debug("Going to rollback"); + dbConn.rollback(); + } + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.error("Unable to update compaction queue, " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "setRunAs"); + } finally { + closeDbConn(dbConn); + closeStmt(stmt); + } + } catch (RetryException e) { setRunAs(cq_id, user); - } finally { - deadlockCnt = 0; } } @@ -147,14 +150,15 @@ public class CompactionTxnHandler extend */ public CompactionInfo findNextToCompact(String workerId) throws MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; CompactionInfo info = new CompactionInfo(); Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; + "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -175,7 +179,7 @@ public class CompactionTxnHandler extend // Now, update this record as being worked on by this worker. long now = getDbTime(dbConn); s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + - "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id; + "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) != 1) { LOG.error("Unable to update compaction record"); @@ -187,38 +191,34 @@ public class CompactionTxnHandler extend return info; } catch (SQLException e) { LOG.error("Unable to select next element for compaction, " + e.getMessage()); - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "findNextToCompact"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "findNextToCompact"); throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); closeStmt(stmt); } - } catch (DeadlockException e) { + } catch (RetryException e) { return findNextToCompact(workerId); - } finally { - deadlockCnt = 0; } } /** * This will mark an entry in the queue as compacted * and put it in the ready to clean state. - * @param info info on the compaciton entry to mark as compacted. + * @param info info on the compaction entry to mark as compacted. */ public void markCompacted(CompactionInfo info) throws MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + - "cq_worker_id = null where cq_id = " + info.id; + "cq_worker_id = null where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) != 1) { LOG.error("Unable to update compaction record"); @@ -228,23 +228,18 @@ public class CompactionTxnHandler extend LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { - try { - LOG.error("Unable to update compaction queue " + e.getMessage()); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "markCompacted"); + LOG.error("Unable to update compaction queue " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "markCompacted"); throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); closeStmt(stmt); } - } catch (DeadlockException e) { + } catch (RetryException e) { markCompacted(info); - } finally { - deadlockCnt = 0; } } @@ -254,45 +249,48 @@ public class CompactionTxnHandler extend * @return information on the entry in the queue. */ public List<CompactionInfo> findReadyToClean() throws MetaException { - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + Connection dbConn = null; List<CompactionInfo> rc = new ArrayList<CompactionInfo>(); Statement stmt = null; try { - stmt = dbConn.createStatement(); - String s = "select cq_id, cq_database, cq_table, cq_partition, " + + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select cq_id, cq_database, cq_table, cq_partition, " + "cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - while (rs.next()) { - CompactionInfo info = new CompactionInfo(); - info.id = rs.getLong(1); - info.dbname = rs.getString(2); - info.tableName = rs.getString(3); - info.partName = rs.getString(4); - switch (rs.getString(5).charAt(0)) { - case MAJOR_TYPE: info.type = CompactionType.MAJOR; break; - case MINOR_TYPE: info.type = CompactionType.MINOR; break; - default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + while (rs.next()) { + CompactionInfo info = new CompactionInfo(); + info.id = rs.getLong(1); + info.dbname = rs.getString(2); + info.tableName = rs.getString(3); + info.partName = rs.getString(4); + switch (rs.getString(5).charAt(0)) { + case MAJOR_TYPE: info.type = CompactionType.MAJOR; break; + case MINOR_TYPE: info.type = CompactionType.MINOR; break; + default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); + } + info.runAs = rs.getString(6); + rc.add(info); } - info.runAs = rs.getString(6); - rc.add(info); - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - return rc; - } catch (SQLException e) { - LOG.error("Unable to select next element for cleaning, " + e.getMessage()); - try { LOG.debug("Going to rollback"); dbConn.rollback(); - } catch (SQLException e1) { - } - throw new MetaException("Unable to connect to transaction database " + + return rc; + } catch (SQLException e) { + LOG.error("Unable to select next element for cleaning, " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "findReadyToClean"); + throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); - } finally { - closeDbConn(dbConn); - closeStmt(stmt); + } finally { + closeDbConn(dbConn); + closeStmt(stmt); + } + } catch (RetryException e) { + return findReadyToClean(); } } @@ -303,9 +301,10 @@ public class CompactionTxnHandler extend */ public void markCleaned(CompactionInfo info) throws MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); @@ -318,20 +317,20 @@ public class CompactionTxnHandler extend // Remove entries from completed_txn_components as well, so we don't start looking there // again. s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + info.dbname + "' and " + - "ctc_table = '" + info.tableName + "'"; + "ctc_table = '" + info.tableName + "'"; if (info.partName != null) { s += " and ctc_partition = '" + info.partName + "'"; } LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) < 1) { LOG.error("Expected to remove at least one row from completed_txn_components when " + - "marking compaction entry as clean!"); + "marking compaction entry as clean!"); } s = "select txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + - TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" + - info.tableName + "'"; + TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" + + info.tableName + "'"; if (info.partName != null) s += " and tc_partition = '" + info.partName + "'"; LOG.debug("Going to execute update <" + s + ">"); ResultSet rs = stmt.executeQuery(s); @@ -371,23 +370,18 @@ public class CompactionTxnHandler extend LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { - try { - LOG.error("Unable to delete from compaction queue " + e.getMessage()); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "markCleaned"); + LOG.error("Unable to delete from compaction queue " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "markCleaned"); throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); closeStmt(stmt); } - } catch (DeadlockException e) { + } catch (RetryException e) { markCleaned(info); - } finally { - deadlockCnt = 0; } } @@ -396,13 +390,14 @@ public class CompactionTxnHandler extend */ public void cleanEmptyAbortedTxns() throws MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); String s = "select txn_id from TXNS where " + - "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + - "txn_state = '" + TXN_ABORTED + "'"; + "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + + "txn_state = '" + TXN_ABORTED + "'"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); Set<Long> txnids = new HashSet<Long>(); @@ -425,21 +420,16 @@ public class CompactionTxnHandler extend } catch (SQLException e) { LOG.error("Unable to delete from txns table " + e.getMessage()); LOG.debug("Going to rollback"); - try { - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "cleanEmptyAbortedTxns"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "cleanEmptyAbortedTxns"); throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); closeStmt(stmt); } - } catch (DeadlockException e) { + } catch (RetryException e) { cleanEmptyAbortedTxns(); - } finally { - deadlockCnt = 0; } } @@ -454,13 +444,14 @@ public class CompactionTxnHandler extend */ public void revokeFromLocalWorkers(String hostname) throws MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" - + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" - + hostname + "%'"; + + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" + + hostname + "%'"; LOG.debug("Going to execute update <" + s + ">"); // It isn't an error if the following returns no rows, as the local workers could have died // with nothing assigned to them. @@ -468,24 +459,19 @@ public class CompactionTxnHandler extend LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { - try { - LOG.error("Unable to change dead worker's records back to initiated state " + - e.getMessage()); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "revokeFromLocalWorkers"); + LOG.error("Unable to change dead worker's records back to initiated state " + + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "revokeFromLocalWorkers"); throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); closeStmt(stmt); } - } catch (DeadlockException e) { + } catch (RetryException e) { revokeFromLocalWorkers(hostname); - } finally { - deadlockCnt = 0; } } @@ -500,14 +486,15 @@ public class CompactionTxnHandler extend */ public void revokeTimedoutWorkers(long timeout) throws MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); - long latestValidStart = getDbTime(dbConn) - timeout; + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + long latestValidStart = getDbTime(dbConn) - timeout; stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" - + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < " - + latestValidStart; + + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < " + + latestValidStart; LOG.debug("Going to execute update <" + s + ">"); // It isn't an error if the following returns no rows, as the local workers could have died // with nothing assigned to them. @@ -515,24 +502,19 @@ public class CompactionTxnHandler extend LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { - try { - LOG.error("Unable to change dead worker's records back to initiated state " + - e.getMessage()); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "revokeTimedoutWorkers"); + LOG.error("Unable to change dead worker's records back to initiated state " + + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "revokeTimedoutWorkers"); throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); closeStmt(stmt); } - } catch (DeadlockException e) { + } catch (RetryException e) { revokeTimedoutWorkers(timeout); - } finally { - deadlockCnt = 0; } } @@ -543,53 +525,55 @@ public class CompactionTxnHandler extend * @throws MetaException */ public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException { - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { - String quote = getIdentifierQuoteString(dbConn); - stmt = dbConn.createStatement(); - StringBuilder bldr = new StringBuilder(); - bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote) + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + String quote = getIdentifierQuoteString(dbConn); + stmt = dbConn.createStatement(); + StringBuilder bldr = new StringBuilder(); + bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote) .append(" FROM ") .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS")) - .append(quote) + .append(quote) .append(" WHERE ") .append(quote).append("DB_NAME").append(quote).append(" = '").append(ci.dbname) - .append("' AND ").append(quote).append("TABLE_NAME").append(quote) - .append(" = '").append(ci.tableName).append("'"); - if (ci.partName != null) { - bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = '") + .append("' AND ").append(quote).append("TABLE_NAME").append(quote) + .append(" = '").append(ci.tableName).append("'"); + if (ci.partName != null) { + bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = '") .append(ci.partName).append("'"); - } - String s = bldr.toString(); + } + String s = bldr.toString(); /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS") + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'" + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");*/ - LOG.debug("Going to execute <" + s + ">"); - rs = stmt.executeQuery(s); - List<String> columns = new ArrayList<String>(); - while(rs.next()) { - columns.add(rs.getString(1)); - } - LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName + - (ci.partName == null ? "" : "/" + ci.partName)); - dbConn.commit(); - return columns; - } catch (SQLException e) { - try { + LOG.debug("Going to execute <" + s + ">"); + rs = stmt.executeQuery(s); + List<String> columns = new ArrayList<String>(); + while (rs.next()) { + columns.add(rs.getString(1)); + } + LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName + + (ci.partName == null ? "" : "/" + ci.partName)); + dbConn.commit(); + return columns; + } catch (SQLException e) { LOG.error("Failed to find columns to analyze stats on for " + ci.tableName + - (ci.partName == null ? "" : "/" + ci.partName), e); - dbConn.rollback(); - } catch (SQLException e1) { - //nothing we can do here + (ci.partName == null ? "" : "/" + ci.partName), e); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "findColumnsWithStats"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); } - throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); - } finally { - close(rs, stmt, dbConn); + } catch (RetryException ex) { + return findColumnsWithStats(ci); } } }