Repository: hive Updated Branches: refs/heads/branch-1.2 e1b03dd46 -> 715018ac7
HIVE-10483 - insert overwrite partition deadlocks on itself with DbTxnManager (Eugene Koifman, reviewed by Alan Gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/715018ac Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/715018ac Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/715018ac Branch: refs/heads/branch-1.2 Commit: 715018ac79e89c75e9b2e381175127ae6dbc6343 Parents: e1b03dd Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Tue Apr 28 17:46:24 2015 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Tue Apr 28 17:46:24 2015 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/common/JavaUtils.java | 8 ++++++ .../hadoop/hive/metastore/txn/TxnHandler.java | 22 +++++++++++----- .../hadoop/hive/ql/lockmgr/DbLockManager.java | 9 +++++-- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 27 +++++++++++++++++++- 4 files changed, 57 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/715018ac/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java index a212fb8..3dd8f75 100644 --- a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java @@ -136,6 +136,14 @@ public final class JavaUtils { LogFactory.release(loader); } + /** + * Utility method for ACID to normalize logging info + * @param extLockId LockResponse.lockid + */ + public static String lockIdToString(long extLockId) { + return "lockid:" + extLockId; + } + private JavaUtils() { // prevent instantiation } http://git-wip-us.apache.org/repos/asf/hive/blob/715018ac/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 1e64fc7..704c3ed 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -28,6 +28,7 @@ import org.apache.commons.dbcp.PoolingDataSource; import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; @@ -1115,6 +1116,8 @@ public class TxnHandler { private static class LockInfo { private final long extLockId; private final long intLockId; + //0 means there is no transaction, i.e. it a select statement which is not part of + //explicit transaction or a IUD statement that is not writing to ACID table private final long txnId; private final String db; private final String table; @@ -1144,7 +1147,7 @@ public class TxnHandler { default: throw new MetaException("Unknown lock type " + rs.getString("hl_lock_type").charAt(0)); } - txnId = rs.getLong("hl_txnid"); + txnId = rs.getLong("hl_txnid");//returns 0 if value is NULL } LockInfo(ShowLocksResponseElement e, long intLockId) { extLockId = e.getLockid(); @@ -1166,7 +1169,7 @@ public class TxnHandler { @Override public String toString() { - return "extLockId:" + Long.toString(extLockId) + " intLockId:" + + return JavaUtils.lockIdToString(extLockId) + " intLockId:" + intLockId + " txnId:" + Long.toString (txnId) + " db:" + db + " table:" + table + " partition:" + partition + " state:" + (state == null ? "null" : state.toString()) @@ -1642,10 +1645,17 @@ public class TxnHandler { * on a database. */ private boolean ignoreConflict(LockInfo desiredLock, LockInfo existingLock) { - return (desiredLock.isDbLock() && desiredLock.type == LockType.SHARED_READ && - existingLock.isTableLock() && existingLock.type == LockType.EXCLUSIVE) || - (existingLock.isDbLock() && existingLock.type == LockType.SHARED_READ && - desiredLock.isTableLock() && desiredLock.type == LockType.EXCLUSIVE); + return + ((desiredLock.isDbLock() && desiredLock.type == LockType.SHARED_READ && + existingLock.isTableLock() && existingLock.type == LockType.EXCLUSIVE) || + (existingLock.isDbLock() && existingLock.type == LockType.SHARED_READ && + desiredLock.isTableLock() && desiredLock.type == LockType.EXCLUSIVE)) + || + //different locks from same txn should not conflict with each other + (desiredLock.txnId != 0 && desiredLock.txnId == existingLock.txnId) || + //txnId=0 means it's a select or IUD which does not write to ACID table, e.g + //insert overwrite table T partition(p=1) select a,b from T and autoCommit=true + (desiredLock.txnId == 0 && desiredLock.extLockId == existingLock.extLockId); } private void wait(Connection dbConn, Savepoint save) throws SQLException { http://git-wip-us.apache.org/repos/asf/hive/blob/715018ac/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 805e090..e8c49ef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.*; @@ -129,9 +130,9 @@ public class DbLockManager implements HiveLockManager{ public void unlock(HiveLock hiveLock) throws LockException { long lockId = ((DbHiveLock)hiveLock).lockId; try { - LOG.debug("Unlocking id:" + lockId); + LOG.debug("Unlocking " + hiveLock); client.unlock(lockId); - boolean removed = locks.remove((DbHiveLock)hiveLock); + boolean removed = locks.remove(hiveLock); LOG.debug("Removed a lock " + removed); } catch (NoSuchLockException e) { LOG.error("Metastore could find no record of lock " + lockId); @@ -228,6 +229,10 @@ public class DbLockManager implements HiveLockManager{ public int hashCode() { return (int)(lockId % Integer.MAX_VALUE); } + @Override + public String toString() { + return JavaUtils.lockIdToString(lockId); + } } /** http://git-wip-us.apache.org/repos/asf/hive/blob/715018ac/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index ac5ae2a..1431e19 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -44,7 +44,9 @@ public class TestTxnCommands2 { private Driver d; private static enum Table { ACIDTBL("acidTbl"), - NONACIDORCTBL("nonAcidOrcTbl"); + ACIDTBLPART("acidTblPart"), + NONACIDORCTBL("nonAcidOrcTbl"), + NONACIDPART("nonAcidPart"); private final String name; @Override @@ -78,7 +80,9 @@ public class TestTxnCommands2 { d = new Driver(hiveConf); dropTables(); runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')"); } private void dropTables() throws Exception { for(Table t : Table.values()) { @@ -138,6 +142,27 @@ public class TestTxnCommands2 { Assert.assertEquals("Bulk update2 failed", stringifyValues(updatedData2), rs2); } + @Test + public void testInsertOverwriteWithSelfJoin() throws Exception { + int[][] part1Data = {{1,7}}; + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) " + makeValuesClause(part1Data)); + //this works because logically we need S lock on NONACIDORCTBL to read and X lock to write, but + //LockRequestBuilder dedups locks on the same entity to only keep the highest level lock requested + runStatementOnDriver("insert overwrite table " + Table.NONACIDORCTBL + " select 2, 9 from " + Table.NONACIDORCTBL + " T inner join " + Table.NONACIDORCTBL + " S on T.a=S.a"); + List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b"); + int[][] joinData = {{2,9}}; + Assert.assertEquals("Self join non-part insert overwrite failed", stringifyValues(joinData), rs); + int[][] part2Data = {{1,8}}; + runStatementOnDriver("insert into " + Table.NONACIDPART + " partition(p=1) (a,b) " + makeValuesClause(part1Data)); + runStatementOnDriver("insert into " + Table.NONACIDPART + " partition(p=2) (a,b) " + makeValuesClause(part2Data)); + //here we need X lock on p=1 partition to write and S lock on 'table' to read which should + //not block each other since they are part of the same txn + runStatementOnDriver("insert overwrite table " + Table.NONACIDPART + " partition(p=1) select a,b from " + Table.NONACIDPART); + List<String> rs2 = runStatementOnDriver("select a,b from " + Table.NONACIDPART + " order by a,b"); + int[][] updatedData = {{1,7},{1,8},{1,8}}; + Assert.assertEquals("Insert overwrite partition failed", stringifyValues(updatedData), rs2); + //insert overwrite not supported for ACID tables + } /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order