HIVE-17340 TxnHandler.checkLock() - reduce number of SQL statements (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/75671197 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/75671197 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/75671197 Branch: refs/heads/master Commit: 75671197cfca8d013bc7b6a6346d6318b10ca83c Parents: 1938eeb Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Fri Aug 25 12:49:17 2017 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Fri Aug 25 12:49:17 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hive/metastore/txn/TxnHandler.java | 81 +++++++++++--------- 1 file changed, 46 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/75671197/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 71e7c0c..5bfc029 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -2444,10 +2444,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * * This is expected to run at READ_COMMITTED. * - * Note: this calls acquire() for (extLockId,intLockId) but extLockId is the same and we either take - * all locks for given extLockId or none. Would be more efficient to update state on all locks - * at once. Semantics are the same since this is all part of the same txn. - * * If there is a concurrent commitTxn/rollbackTxn, those can only remove rows from HIVE_LOCKS. * If they happen to be for the same txnid, there will be a WW conflict (in MS DB), if different txnid, * checkLock() will in the worst case keep locks in Waiting state a little longer. @@ -2654,7 +2650,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { // Look at everything in front of this lock to see if it should block // it or not. - boolean acquired = false; for (int i = index - 1; i >= 0; i--) { // Check if we're operating on the same database, if not, move on if (!locks[index].db.equals(locks[i].db)) { @@ -2706,20 +2701,16 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } //fall through to ACQUIRE case ACQUIRE: - acquire(dbConn, stmt, extLockId, info); - acquired = true; break; case KEEP_LOOKING: continue; } - if (acquired) break; // We've acquired this lock component, - // so get out of the loop and look at the next component. + //if we got here, it means it's ok to acquire 'info' lock + break;// so exit the loop and check next lock } - - // If we've arrived here and we have not already acquired, it means there's nothing in the - // way of the lock, so acquire the lock. - if (!acquired) acquire(dbConn, stmt, extLockId, info); } + //if here, ther were no locks that blocked any locks in 'locksBeingChecked' - acquire them all + acquire(dbConn, stmt, locksBeingChecked); // We acquired all of the locks, so commit and return acquired. LOG.debug("Going to commit"); @@ -2733,6 +2724,48 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } return response; } + private void acquire(Connection dbConn, Statement stmt, List<LockInfo> locksBeingChecked) + throws SQLException, NoSuchLockException, MetaException { + if(locksBeingChecked == null || locksBeingChecked.isEmpty()) { + return; + } + long txnId = locksBeingChecked.get(0).txnId; + long extLockId = locksBeingChecked.get(0).extLockId; + long now = getDbTime(dbConn); + String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " + + //if lock is part of txn, heartbeat info is in txn record + "hl_last_heartbeat = " + (isValidTxn(txnId) ? 0 : now) + + ", hl_acquired_at = " + now + ",HL_BLOCKEDBY_EXT_ID=NULL,HL_BLOCKEDBY_INT_ID=null" + + " where hl_lock_ext_id = " + extLockId; + LOG.debug("Going to execute update <" + s + ">"); + int rc = stmt.executeUpdate(s); + if (rc < locksBeingChecked.size()) { + LOG.debug("Going to rollback acquire(Connection dbConn, Statement stmt, List<LockInfo> locksBeingChecked)"); + dbConn.rollback(); + /*select all locks for this ext ID and see which ones are missing*/ + StringBuilder sb = new StringBuilder("No such lock(s): (" + JavaUtils.lockIdToString(extLockId) + ":"); + ResultSet rs = stmt.executeQuery("select hl_lock_int_id from HIVE_LOCKS where hl_lock_ext_id = " + extLockId); + while(rs.next()) { + int intLockId = rs.getInt(1); + int idx = 0; + for(; idx < locksBeingChecked.size(); idx++) { + LockInfo expectedLock = locksBeingChecked.get(idx); + if(expectedLock != null && expectedLock.intLockId == intLockId) { + locksBeingChecked.set(idx, null); + break; + } + } + } + for(LockInfo expectedLock : locksBeingChecked) { + if(expectedLock != null) { + sb.append(expectedLock.intLockId).append(","); + } + } + sb.append(") ").append(JavaUtils.txnIdToString(txnId)); + close(rs); + throw new NoSuchLockException(sb.toString()); + } + } /** * the {@link #jumpTable} only deals with LockState/LockType. In some cases it's not @@ -2775,28 +2808,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { LOG.debug("Going to rollback to savepoint"); dbConn.rollback(save); } - - private void acquire(Connection dbConn, Statement stmt, long extLockId, LockInfo lockInfo) - throws SQLException, NoSuchLockException, MetaException { - long now = getDbTime(dbConn); - String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " + - //if lock is part of txn, heartbeat info is in txn record - "hl_last_heartbeat = " + (isValidTxn(lockInfo.txnId) ? 0 : now) + - ", hl_acquired_at = " + now + ",HL_BLOCKEDBY_EXT_ID=NULL,HL_BLOCKEDBY_INT_ID=null" + " where hl_lock_ext_id = " + - extLockId + " and hl_lock_int_id = " + lockInfo.intLockId; - LOG.debug("Going to execute update <" + s + ">"); - int rc = stmt.executeUpdate(s); - if (rc < 1) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new NoSuchLockException("No such lock: (" + JavaUtils.lockIdToString(extLockId) + "," + - + lockInfo.intLockId + ") " + JavaUtils.txnIdToString(lockInfo.txnId)); - } - // We update the database, but we don't commit because there may be other - // locks together with this, and we only want to acquire one if we can - // acquire all. - } - /** * Heartbeats on the lock table. This commits, so do not enter it with any state. * Should not be called on a lock that belongs to transaction.