Repository: hive Updated Branches: refs/heads/master cc3fd84ee -> 1d7f4b75d
HIVE-13014 RetryingMetaStoreClient is retrying acid related calls too aggressievley(Eugene Koifman, reviewed by Alan Gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1d7f4b75 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1d7f4b75 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1d7f4b75 Branch: refs/heads/master Commit: 1d7f4b75de43db1201f9d494d7a2100f923a7ad6 Parents: cc3fd84 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Tue Jan 24 13:00:43 2017 -0800 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Tue Jan 24 13:00:43 2017 -0800 ---------------------------------------------------------------------- .../common/classification/RetrySemantics.java | 38 ++++ .../hadoop/hive/metastore/IMetaStoreClient.java | 2 + .../hive/metastore/RetryingMetaStoreClient.java | 13 +- .../metastore/txn/CompactionTxnHandler.java | 32 +++- .../hadoop/hive/metastore/txn/TxnHandler.java | 180 ++++++++++++++++--- .../hadoop/hive/metastore/txn/TxnStore.java | 38 +++- .../hive/metastore/txn/TestTxnHandler.java | 42 +++-- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 2 +- .../hive/ql/lockmgr/TestDbTxnManager.java | 21 ++- 9 files changed, 320 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/common/src/java/org/apache/hadoop/hive/common/classification/RetrySemantics.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/classification/RetrySemantics.java b/common/src/java/org/apache/hadoop/hive/common/classification/RetrySemantics.java new file mode 100644 index 0000000..abad45e --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/classification/RetrySemantics.java @@ -0,0 +1,38 @@ +package org.apache.hadoop.hive.common.classification; + + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * These annotations are meant to indicate how to handle retry logic. + * Initially meant for Metastore API when made across a network, i.e. asynchronously where + * the response may not reach the caller and thus it cannot know if the operation was actually + * performed on the server. + * @see RetryingMetastoreClient + */ +@InterfaceStability.Evolving +@InterfaceAudience.LimitedPrivate("Hive developer") +public class RetrySemantics { + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface Idempotent { + String[] value() default ""; + int maxRetryCount() default Integer.MAX_VALUE; + int delayMs() default 100; + } + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface ReadOnly {/*trivially retry-able*/} + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface CannotRetry {} + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + public @interface SafeToRetry { + /*may not be Idempotent but is safe to retry*/ + String[] value() default ""; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index fb61db1..84ec332 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceAudience.Public; import org.apache.hadoop.hive.common.classification.InterfaceStability.Evolving; +import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.annotation.NoReconnect; @@ -1347,6 +1348,7 @@ public interface IMetaStoreClient { * aborted. This can result from the transaction timing out. * @throws TException */ + @RetrySemantics.CannotRetry LockResponse lock(LockRequest request) throws NoSuchTxnException, TxnAbortedException, TException; http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java index 96d8248..a6545a9 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.metastore; import java.io.IOException; +import java.lang.annotation.Annotation; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -28,6 +29,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.classification.InterfaceAudience.Public; @@ -142,6 +144,15 @@ public class RetryingMetaStoreClient implements InvocationHandler { TException caughtException = null; boolean allowReconnect = ! method.isAnnotationPresent(NoReconnect.class); + boolean allowRetry = true; + Annotation[] directives = method.getDeclaredAnnotations(); + if(directives != null) { + for(Annotation a : directives) { + if(a instanceof RetrySemantics.CannotRetry) { + allowRetry = false; + } + } + } while (true) { try { @@ -200,7 +211,7 @@ public class RetryingMetaStoreClient implements InvocationHandler { } - if (retriesMade >= retryLimit || base.isLocalMetaStore()) { + if (retriesMade >= retryLimit || base.isLocalMetaStore() || !allowRetry) { throw caughtException; } retriesMade++; http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 545244b..60839fa 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.metastore.txn; +import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -55,6 +56,8 @@ class CompactionTxnHandler extends TxnHandler { * @return list of CompactionInfo structs. These will not have id, type, * or runAs set since these are only potential compactions not actual ones. */ + @Override + @RetrySemantics.ReadOnly public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException { Connection dbConn = null; Set<CompactionInfo> response = new HashSet<CompactionInfo>(); @@ -117,6 +120,8 @@ class CompactionTxnHandler extends TxnHandler { * @param cq_id id of this entry in the queue * @param user user to run the jobs as */ + @Override + @RetrySemantics.Idempotent public void setRunAs(long cq_id, String user) throws MetaException { try { Connection dbConn = null; @@ -154,6 +159,8 @@ class CompactionTxnHandler extends TxnHandler { * @param workerId id of the worker calling this, will be recorded in the db * @return an info element for this compaction request, or null if there is no work to do now. */ + @Override + @RetrySemantics.SafeToRetry public CompactionInfo findNextToCompact(String workerId) throws MetaException { try { Connection dbConn = null; @@ -225,6 +232,8 @@ class CompactionTxnHandler extends TxnHandler { * and put it in the ready to clean state. * @param info info on the compaction entry to mark as compacted. */ + @Override + @RetrySemantics.SafeToRetry public void markCompacted(CompactionInfo info) throws MetaException { try { Connection dbConn = null; @@ -264,6 +273,8 @@ class CompactionTxnHandler extends TxnHandler { * be cleaned. * @return information on the entry in the queue. */ + @Override + @RetrySemantics.ReadOnly public List<CompactionInfo> findReadyToClean() throws MetaException { Connection dbConn = null; List<CompactionInfo> rc = new ArrayList<CompactionInfo>(); @@ -317,6 +328,8 @@ class CompactionTxnHandler extends TxnHandler { * * @param info info on the compaction entry to remove */ + @Override + @RetrySemantics.CannotRetry public void markCleaned(CompactionInfo info) throws MetaException { try { Connection dbConn = null; @@ -429,6 +442,8 @@ class CompactionTxnHandler extends TxnHandler { * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. */ + @Override + @RetrySemantics.SafeToRetry public void cleanEmptyAbortedTxns() throws MetaException { try { Connection dbConn = null; @@ -493,6 +508,8 @@ class CompactionTxnHandler extends TxnHandler { * @param hostname Name of this host. It is assumed this prefixes the thread's worker id, * so that like hostname% will match the worker id. */ + @Override + @RetrySemantics.Idempotent public void revokeFromLocalWorkers(String hostname) throws MetaException { try { Connection dbConn = null; @@ -535,6 +552,8 @@ class CompactionTxnHandler extends TxnHandler { * @param timeout number of milliseconds since start time that should elapse before a worker is * declared dead. */ + @Override + @RetrySemantics.Idempotent public void revokeTimedoutWorkers(long timeout) throws MetaException { try { Connection dbConn = null; @@ -575,6 +594,8 @@ class CompactionTxnHandler extends TxnHandler { * table level stats are examined. * @throws MetaException */ + @Override + @RetrySemantics.ReadOnly public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException { Connection dbConn = null; Statement stmt = null; @@ -631,6 +652,8 @@ class CompactionTxnHandler extends TxnHandler { * Record the highest txn id that the {@code ci} compaction job will pay attention to. * This is the highest resolved txn id, i.e. such that there are no open txns with lower ids. */ + @Override + @RetrySemantics.Idempotent public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException { Connection dbConn = null; Statement stmt = null; @@ -696,6 +719,8 @@ class CompactionTxnHandler extends TxnHandler { * it's not recent. * @throws MetaException */ + @Override + @RetrySemantics.SafeToRetry public void purgeCompactionHistory() throws MetaException { Connection dbConn = null; Statement stmt = null; @@ -762,7 +787,7 @@ class CompactionTxnHandler extends TxnHandler { * this ensures that the number of failed compaction entries retained is > than number of failed * compaction threshold which prevents new compactions from being scheduled. */ - public int getFailedCompactionRetention() { + private int getFailedCompactionRetention() { int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); int failedRetention = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED); if(failedRetention < failedThreshold) { @@ -783,6 +808,8 @@ class CompactionTxnHandler extends TxnHandler { * That would be a meta operations, i.e. first find all partitions for this table (which have * txn info) and schedule each compaction separately. This avoids complications in this logic. */ + @Override + @RetrySemantics.ReadOnly public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { Connection dbConn = null; Statement stmt = null; @@ -829,6 +856,8 @@ class CompactionTxnHandler extends TxnHandler { * If there is no entry in compaction_queue, it means Initiator failed to even schedule a compaction, * which we record as ATTEMPTED_STATE entry in history. */ + @Override + @RetrySemantics.CannotRetry public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw //todo: this should take "comment" as parameter to set in CC_META_INFO to provide some context for the failure try { @@ -894,6 +923,7 @@ class CompactionTxnHandler extends TxnHandler { } } @Override + @RetrySemantics.Idempotent public void setHadoopJobId(String hadoopJobId, long id) { try { Connection dbConn = null; http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 75a58c6..805db34 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -30,10 +30,10 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.HouseKeeperService; import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.dbcp.PoolingDataSource; @@ -101,6 +101,17 @@ import java.util.regex.Pattern; * It's imperative that any operation on a txn (e.g. commit), ensure (atomically) that this txn is * still valid and active. In the code this is usually achieved at the same time the txn record * is locked for some operation. + * + * Note on retry logic: + * Metastore has retry logic in both {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient} + * and {@link org.apache.hadoop.hive.metastore.RetryingHMSHandler}. The retry logic there is very + * generic and is not aware whether the operations are idempotent or not. (This is separate from + * retry logic here in TxnHander which can/does retry DB errors intelligently). The worst case is + * when an op here issues a successful commit against the RDBMS but the calling stack doesn't + * receive the ack and retries. (If an op fails before commit, it's trivially idempotent) + * Thus the ops here need to be made idempotent as much as possible or + * the metstore call stack should have logic not to retry. There are {@link RetrySemantics} + * annotations to document the behavior. */ @InterfaceAudience.Private @InterfaceStability.Evolving @@ -120,6 +131,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { // Transaction states static final protected char TXN_ABORTED = 'a'; static final protected char TXN_OPEN = 'o'; + //todo: make these like OperationType and remove above char constatns + enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN} // Lock states static final protected char LOCK_ACQUIRED = 'a'; @@ -263,7 +276,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { dumpConfig = false; } } - + @Override + @RetrySemantics.ReadOnly public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { try { // We need to figure out the current transaction number and the list of @@ -339,7 +353,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { return getOpenTxnsInfo(); } } - + @Override + @RetrySemantics.ReadOnly public GetOpenTxnsResponse getOpenTxns() throws MetaException { try { // We need to figure out the current transaction number and the list of @@ -414,6 +429,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } + /** + * Retry-by-caller note: + * Worst case, it will leave an open txn which will timeout. + */ + @Override + @RetrySemantics.Idempotent public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { if (openTxnsCounter == null) { synchronized (TxnHandler.class) { @@ -515,7 +536,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { return openTxns(rqst); } } - + @Override + @RetrySemantics.Idempotent public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException { long txnid = rqst.getTxnid(); try { @@ -525,10 +547,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { - LOG.debug("Going to rollback"); - dbConn.rollback(); stmt = dbConn.createStatement(); - ensureValidTxn(dbConn, txnid, stmt); + TxnStatus status = findTxnState(txnid,stmt); + if(status == TxnStatus.ABORTED) { + LOG.info("abortTxn(" + JavaUtils.txnIdToString(txnid) + + ") requested by it is already " + TxnStatus.ABORTED); + return; + } + raiseTxnUnexpectedState(status, txnid); } LOG.debug("Going to commit"); @@ -547,7 +573,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { abortTxn(rqst); } } - + @Override + @RetrySemantics.Idempotent public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException { List<Long> txnids = rqst.getTxn_ids(); try { @@ -556,7 +583,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); int numAborted = abortTxns(dbConn, txnids, false); if (numAborted != txnids.size()) { - LOG.warn("Abort Transactions command only abort " + numAborted + " out of " + + LOG.warn("Abort Transactions command only aborted " + numAborted + " out of " + txnids.size() + " transactions. It's possible that the other " + (txnids.size() - numAborted) + " transactions have been aborted or committed, or the transaction ids are invalid."); @@ -602,6 +629,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed) * 'x' would be updated to the same value by both, i.e. lost update. */ + @Override + @RetrySemantics.Idempotent("No-op if already committed") public void commitTxn(CommitTxnRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { long txnid = rqst.getTxnid(); @@ -622,9 +651,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { */ lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); if (lockHandle == null) { - //this also ensures that txn is still there and in expected state (hasn't been timed out) - ensureValidTxn(dbConn, txnid, stmt); + //if here, txn was not found (in expected state) + TxnStatus actualTxnStatus = findTxnState(txnid, stmt); + if(actualTxnStatus == TxnStatus.COMMITTED) { + /** + * This makes the operation idempotent + * (assume that this is most likely due to retry logic) + */ + LOG.info("Nth commitTxn(" + JavaUtils.txnIdToString(txnid) + ") msg"); + return; + } + raiseTxnUnexpectedState(actualTxnStatus, txnid); shouldNeverHappen(txnid); + //dbConn is rolled back in finally{} } String conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")"; @@ -645,9 +684,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } long commitId = commitIdRs.getLong(1); Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint(); + /** + * "select distinct" is used below because + * 1. once we get to multi-statement txns, we only care to record that something was updated once + * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried by caller it my create + * duplicate entries in TXN_COMPONENTS + * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct + * even if it includes all of it's columns + */ int numCompsWritten = stmt.executeUpdate( "insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" + - " select tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + conflictSQLSuffix); + " select distinct tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + conflictSQLSuffix); /** * see if there are any overlapping txns wrote the same element, i.e. have a conflict * Since entire commit operation is mutexed wrt other start/commit ops, @@ -750,6 +797,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } @Override + @RetrySemantics.SafeToRetry public void performWriteSetGC() { Connection dbConn = null; Statement stmt = null; @@ -798,7 +846,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * connection (but separate transactions). This avoid some flakiness in BONECP where if you * perform an operation on 1 connection and immediately get another fron the pool, the 2nd one * doesn't see results of the first. + * + * Retry-by-caller note: If the call to lock is from a transaction, then in the worst case + * there will be a duplicate set of locks but both sets will belong to the same txn so they + * will not conflict with each other. For locks w/o txn context (i.e. read-only query), this + * may lead to deadlock (at least a long wait). (e.g. 1st call creates locks in {@code LOCK_WAITING} + * mode and response gets lost. Then {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient} + * retries, and enqueues another set of locks in LOCK_WAITING. The 2nd LockResponse is delivered + * to the DbLockManager, which will keep dong {@link #checkLock(CheckLockRequest)} until the 1st + * set of locks times out. */ + @RetrySemantics.CannotRetry public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst); try { @@ -1063,7 +1121,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * {@link #checkLock(java.sql.Connection, long)} must run at SERIALIZABLE (make sure some lock we are checking * against doesn't move from W to A in another txn) but this method can heartbeat in * separate txn at READ_COMMITTED. + * + * Retry-by-caller note: + * Retryable because {@link #checkLock(Connection, long)} is */ + @Override + @RetrySemantics.SafeToRetry public LockResponse checkLock(CheckLockRequest rqst) throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { try { @@ -1112,6 +1175,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * heartbeat/performTimeout which are update/delete of HIVE_LOCKS thus will be locked as needed by db. * since this only removes from HIVE_LOCKS at worst some lock acquire is delayed */ + @RetrySemantics.Idempotent public void unlock(UnlockRequest rqst) throws NoSuchLockException, TxnOpenException, MetaException { try { @@ -1146,11 +1210,15 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if(info == null) { //didn't find any lock with extLockId but at ReadCommitted there is a possibility that //it existed when above delete ran but it didn't have the expected state. - LOG.error("No lock in " + LOCK_WAITING + " mode found for unlock(" + rqst + ")"); - throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); + LOG.info("No lock in " + LOCK_WAITING + " mode found for unlock(" + + JavaUtils.lockIdToString(rqst.getLockid()) + ")"); + //bail here to make the operation idempotent + return; } if(info.txnId != 0) { String msg = "Unlocking locks associated with transaction not permitted. " + info; + //if a lock is associated with a txn we can only "unlock" if if it's in WAITING state + // which really means that the caller wants to give up waiting for the lock LOG.error(msg); throw new TxnOpenException(msg); } @@ -1189,6 +1257,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { this.e = e; } } + @RetrySemantics.ReadOnly public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { try { Connection dbConn = null; @@ -1297,6 +1366,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * {@code ids} should only have txnid or lockid but not both, ideally. * Currently DBTxnManager.heartbeat() enforces this. */ + @Override + @RetrySemantics.SafeToRetry public void heartbeat(HeartbeatRequest ids) throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { try { @@ -1318,7 +1389,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { heartbeat(ids); } } - + @Override + @RetrySemantics.SafeToRetry public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst) throws MetaException { try { @@ -1379,6 +1451,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { return id; } @Override + @RetrySemantics.Idempotent public CompactionResponse compact(CompactionRequest rqst) throws MetaException { // Put a compaction request in the queue. try { @@ -1503,6 +1576,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { return Character.toString(s); } } + @RetrySemantics.ReadOnly public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException { ShowCompactResponse response = new ShowCompactResponse(new ArrayList<ShowCompactResponseElement>()); Connection dbConn = null; @@ -1574,6 +1648,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { + JavaUtils.lockIdToString(extLockId) + " " + intLockId); } + /** + * Retry-by-caller note: + * This may be retried after dbConn.commit. At worst, it will create duplicate entries in + * TXN_COMPONENTS which won't affect anything. See more comments in {@link #commitTxn(CommitTxnRequest)} + */ + @Override + @RetrySemantics.SafeToRetry public void addDynamicPartitions(AddDynamicPartitions rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { Connection dbConn = null; @@ -1629,7 +1710,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { /** * Clean up corresponding records in metastore tables when corresponding object is dropped, * specifically: TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS + * Retry-by-caller note: this is only idempotent assuming it's only called by dropTable/Db/etc + * operations. */ + @Override + @RetrySemantics.Idempotent public void cleanupRecords(HiveObjectType type, Database db, Table table, Iterator<Partition> partitionIterator) throws MetaException { try { @@ -2322,6 +2407,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * If they happen to be for the same txnid, there will be a WW conflict (in MS DB), if different txnid, * checkLock() will in the worst case keep locks in Waiting state a little longer. */ + @RetrySemantics.SafeToRetry("See @SafeToRetry") private LockResponse checkLock(Connection dbConn, long extLockId) throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException { TxnStore.MutexAPI.LockHandle handle = null; @@ -2331,7 +2417,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { /** * todo: Longer term we should pass this from client somehow - this would be an optimization; once * that is in place make sure to build and test "writeSet" below using OperationType not LockType - * With SP we assume that the query modifies exactly the partitions it locked. (not entirely + * With Static Partitions we assume that the query modifies exactly the partitions it locked. (not entirely * realistic since Update/Delete may have some predicate that filters out all records out of * some partition(s), but plausible). For DP, we acquire locks very wide (all known partitions), * but for most queries only a fraction will actually be updated. #addDynamicPartitions() tells @@ -2517,6 +2603,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { // If we've found it and it's already been marked acquired, // then just look at the other locks. if (locks[index].state == LockState.ACQUIRED) { + /**this is what makes this method @SafeToRetry*/ continue; } @@ -2720,6 +2807,55 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } + /** + * Returns the state of the transaction iff it's able to determine it. Some cases where it cannot: + * 1. txnid was Aborted/Committed and then GC'd (compacted) + * 2. txnid was committed but it didn't modify anything (nothing in COMPLETED_TXN_COMPONENTS) + */ + private TxnStatus findTxnState(long txnid, Statement stmt) throws SQLException, MetaException { + String s = "select txn_state from TXNS where txn_id = " + txnid; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + s = sqlGenerator.addLimitClause(1, "1 from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid); + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs2 = stmt.executeQuery(s); + if(rs2.next()) { + return TxnStatus.COMMITTED; + } + //could also check WRITE_SET but that seems overkill + return TxnStatus.UNKNOWN; + } + char txnState = rs.getString(1).charAt(0); + if (txnState == TXN_ABORTED) { + return TxnStatus.ABORTED; + } + assert txnState == TXN_OPEN : "we found it in TXNS but it's not ABORTED, so must be OPEN"; + return TxnStatus.OPEN; + } + + /** + * Used to raise an informative error when the caller expected a txn in a particular TxnStatus + * but found it in some other status + */ + private static void raiseTxnUnexpectedState(TxnStatus actualStatus, long txnid) + throws NoSuchTxnException, TxnAbortedException { + switch (actualStatus) { + case ABORTED: + throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) + " already aborted"); + case COMMITTED: + throw new NoSuchTxnException("Transaction " + JavaUtils.txnIdToString(txnid) + " is already committed."); + case UNKNOWN: + throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid)); + case OPEN: + throw new NoSuchTxnException(JavaUtils.txnIdToString(txnid) + " is " + TxnStatus.OPEN); + default: + throw new IllegalArgumentException("Unknown TxnStatus " + actualStatus); + } + } + /** + * Returns the state of the transaction with {@code txnid} or throws if {@code raiseError} is true. + */ private static void ensureValidTxn(Connection dbConn, long txnid, Statement stmt) throws SQLException, NoSuchTxnException, TxnAbortedException { // We need to check whether this transaction is valid and open @@ -2734,7 +2870,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { //possible for for multi-stmt txns boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0; LOG.debug("Going to rollback"); - dbConn.rollback(); + rollbackDBConn(dbConn); if(alreadyCommitted) { //makes the message more informative - helps to find bugs in client code throw new NoSuchTxnException("Transaction " + JavaUtils.txnIdToString(txnid) + " is already committed."); @@ -2743,7 +2879,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } if (rs.getString(1).charAt(0) == TXN_ABORTED) { LOG.debug("Going to rollback"); - dbConn.rollback(); + rollbackDBConn(dbConn); throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) + " already aborted");//todo: add time of abort, which is not currently tracked. Requires schema change } @@ -2869,6 +3005,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { * Will also delete locks which are not associated with a transaction and have timed out * Tries to keep transactions (against metastore db) small to reduce lock contention. */ + @RetrySemantics.Idempotent public void performTimeOuts() { Connection dbConn = null; Statement stmt = null; @@ -2939,7 +3076,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { close(rs, stmt, dbConn); } } - + @Override + @RetrySemantics.ReadOnly public void countOpenTxns() throws MetaException { Connection dbConn = null; Statement stmt = null; @@ -3244,6 +3382,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } @Override + @RetrySemantics.Idempotent public MutexAPI getMutexAPI() { return this; } @@ -3360,6 +3499,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } /** * Helper class that generates SQL queries with syntax specific to target DB + * todo: why throw MetaException? */ @VisibleForTesting static final class SQLGenerator { @@ -3459,7 +3599,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } /** * Suppose you have a query "select a,b from T" and you want to limit the result set - * to the first 5 rows. The mechanism to do that differs in different DB. + * to the first 5 rows. The mechanism to do that differs in different DBs. * Make {@code noSelectsqlQuery} to be "a,b from T" and this method will return the * appropriately modified row limiting query. * http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 879ae55..041d55b 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.txn; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.*; @@ -68,6 +69,7 @@ public interface TxnStore { * @return information about open transactions * @throws MetaException */ + @RetrySemantics.ReadOnly public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException; /** @@ -75,12 +77,14 @@ public interface TxnStore { * @return list of open transactions, as well as a high water mark. * @throws MetaException */ + @RetrySemantics.ReadOnly public GetOpenTxnsResponse getOpenTxns() throws MetaException; /** * Get the count for open transactions. * @throws MetaException */ + @RetrySemantics.ReadOnly public void countOpenTxns() throws MetaException; /** @@ -89,6 +93,7 @@ public interface TxnStore { * @return information on opened transactions * @throws MetaException */ + @RetrySemantics.Idempotent public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException; /** @@ -97,6 +102,7 @@ public interface TxnStore { * @throws NoSuchTxnException * @throws MetaException */ + @RetrySemantics.Idempotent public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException; /** @@ -105,6 +111,7 @@ public interface TxnStore { * @throws NoSuchTxnException * @throws MetaException */ + @RetrySemantics.Idempotent public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException; /** @@ -114,6 +121,7 @@ public interface TxnStore { * @throws TxnAbortedException * @throws MetaException */ + @RetrySemantics.Idempotent public void commitTxn(CommitTxnRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException; @@ -126,6 +134,7 @@ public interface TxnStore { * @throws TxnAbortedException * @throws MetaException */ + @RetrySemantics.CannotRetry public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException; @@ -139,6 +148,7 @@ public interface TxnStore { * @throws TxnAbortedException * @throws MetaException */ + @RetrySemantics.SafeToRetry public LockResponse checkLock(CheckLockRequest rqst) throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException; @@ -151,6 +161,7 @@ public interface TxnStore { * @throws TxnOpenException * @throws MetaException */ + @RetrySemantics.Idempotent public void unlock(UnlockRequest rqst) throws NoSuchLockException, TxnOpenException, MetaException; @@ -160,6 +171,7 @@ public interface TxnStore { * @return lock information. * @throws MetaException */ + @RetrySemantics.ReadOnly public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException; /** @@ -170,6 +182,7 @@ public interface TxnStore { * @throws TxnAbortedException * @throws MetaException */ + @RetrySemantics.SafeToRetry public void heartbeat(HeartbeatRequest ids) throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException; @@ -179,6 +192,7 @@ public interface TxnStore { * @return info on txns that were heartbeated * @throws MetaException */ + @RetrySemantics.SafeToRetry public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst) throws MetaException; @@ -189,6 +203,7 @@ public interface TxnStore { * @return id of the compaction that has been started or existing id if this resource is already scheduled * @throws MetaException */ + @RetrySemantics.Idempotent public CompactionResponse compact(CompactionRequest rqst) throws MetaException; /** @@ -197,6 +212,7 @@ public interface TxnStore { * @return compaction information * @throws MetaException */ + @RetrySemantics.ReadOnly public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException; /** @@ -206,6 +222,7 @@ public interface TxnStore { * @throws TxnAbortedException * @throws MetaException */ + @RetrySemantics.SafeToRetry public void addDynamicPartitions(AddDynamicPartitions rqst) throws NoSuchTxnException, TxnAbortedException, MetaException; @@ -217,12 +234,14 @@ public interface TxnStore { * @param partitionIterator partition iterator * @throws MetaException */ + @RetrySemantics.Idempotent public void cleanupRecords(HiveObjectType type, Database db, Table table, Iterator<Partition> partitionIterator) throws MetaException; /** * Timeout transactions and/or locks. This should only be called by the compactor. */ + @RetrySemantics.Idempotent public void performTimeOuts(); /** @@ -234,6 +253,7 @@ public interface TxnStore { * @return list of CompactionInfo structs. These will not have id, type, * or runAs set since these are only potential compactions not actual ones. */ + @RetrySemantics.ReadOnly public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException; /** @@ -242,6 +262,7 @@ public interface TxnStore { * @param cq_id id of this entry in the queue * @param user user to run the jobs as */ + @RetrySemantics.Idempotent public void setRunAs(long cq_id, String user) throws MetaException; /** @@ -250,6 +271,7 @@ public interface TxnStore { * @param workerId id of the worker calling this, will be recorded in the db * @return an info element for this compaction request, or null if there is no work to do now. */ + @RetrySemantics.ReadOnly public CompactionInfo findNextToCompact(String workerId) throws MetaException; /** @@ -257,6 +279,7 @@ public interface TxnStore { * and put it in the ready to clean state. * @param info info on the compaction entry to mark as compacted. */ + @RetrySemantics.SafeToRetry public void markCompacted(CompactionInfo info) throws MetaException; /** @@ -264,6 +287,7 @@ public interface TxnStore { * be cleaned. * @return information on the entry in the queue. */ + @RetrySemantics.ReadOnly public List<CompactionInfo> findReadyToClean() throws MetaException; /** @@ -272,6 +296,7 @@ public interface TxnStore { * * @param info info on the compaction entry to remove */ + @RetrySemantics.CannotRetry public void markCleaned(CompactionInfo info) throws MetaException; /** @@ -281,6 +306,7 @@ public interface TxnStore { * @param info information on the compaction that failed. * @throws MetaException */ + @RetrySemantics.CannotRetry public void markFailed(CompactionInfo info) throws MetaException; /** @@ -288,6 +314,7 @@ public interface TxnStore { * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. */ + @RetrySemantics.SafeToRetry public void cleanEmptyAbortedTxns() throws MetaException; /** @@ -299,6 +326,7 @@ public interface TxnStore { * @param hostname Name of this host. It is assumed this prefixes the thread's worker id, * so that like hostname% will match the worker id. */ + @RetrySemantics.Idempotent public void revokeFromLocalWorkers(String hostname) throws MetaException; /** @@ -310,6 +338,7 @@ public interface TxnStore { * @param timeout number of milliseconds since start time that should elapse before a worker is * declared dead. */ + @RetrySemantics.Idempotent public void revokeTimedoutWorkers(long timeout) throws MetaException; /** @@ -318,11 +347,13 @@ public interface TxnStore { * table level stats are examined. * @throws MetaException */ + @RetrySemantics.ReadOnly public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException; /** * Record the highest txn id that the {@code ci} compaction job will pay attention to. */ + @RetrySemantics.Idempotent public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException; /** @@ -333,12 +364,14 @@ public interface TxnStore { * it's not recent. * @throws MetaException */ + @RetrySemantics.SafeToRetry public void purgeCompactionHistory() throws MetaException; /** * WriteSet tracking is used to ensure proper transaction isolation. This method deletes the * transaction metadata once it becomes unnecessary. */ + @RetrySemantics.SafeToRetry public void performWriteSetGC(); /** @@ -349,6 +382,7 @@ public interface TxnStore { * @return true if it is ok to compact, false if there have been too many failures. * @throws MetaException */ + @RetrySemantics.ReadOnly public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException; @VisibleForTesting @@ -357,6 +391,7 @@ public interface TxnStore { @VisibleForTesting long setTimeout(long milliseconds); + @RetrySemantics.Idempotent public MutexAPI getMutexAPI(); /** @@ -382,7 +417,7 @@ public interface TxnStore { public void acquireLock(String key, LockHandle handle) throws MetaException; public static interface LockHandle { /** - * Releases all locks associcated with this handle. + * Releases all locks associated with this handle. */ public void releaseLocks(); } @@ -393,5 +428,6 @@ public interface TxnStore { * it calls this to update the metadata. * @param id {@link CompactionInfo#id} */ + @RetrySemantics.Idempotent public void setHadoopJobId(String hadoopJobId, long id); } http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 11cedb9..adfe98a 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.txn; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; import org.apache.hadoop.hive.metastore.api.CheckLockRequest; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.CompactionRequest; @@ -139,58 +140,69 @@ public class TestTxnHandler { @Test public void testAbortTxn() throws Exception { - OpenTxnsResponse openedTxns = txnHandler.openTxns(new OpenTxnRequest(2, "me", "localhost")); + OpenTxnsResponse openedTxns = txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost")); List<Long> txnList = openedTxns.getTxn_ids(); long first = txnList.get(0); assertEquals(1L, first); long second = txnList.get(1); assertEquals(2L, second); txnHandler.abortTxn(new AbortTxnRequest(1)); + List<String> parts = new ArrayList<String>(); + parts.add("p=1"); + AddDynamicPartitions adp = new AddDynamicPartitions(3, "default", "T", parts); + adp.setOperationType(DataOperationType.INSERT); + txnHandler.addDynamicPartitions(adp); GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); - assertEquals(2L, txnsInfo.getTxn_high_water_mark()); - assertEquals(2, txnsInfo.getOpen_txns().size()); + assertEquals(3, txnsInfo.getTxn_high_water_mark()); + assertEquals(3, txnsInfo.getOpen_txns().size()); assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId()); assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(0).getState()); assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId()); assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState()); + assertEquals(3, txnsInfo.getOpen_txns().get(2).getId()); + assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(2).getState()); GetOpenTxnsResponse txns = txnHandler.getOpenTxns(); - assertEquals(2L, txns.getTxn_high_water_mark()); - assertEquals(2, txns.getOpen_txns().size()); - boolean[] saw = new boolean[3]; + assertEquals(3, txns.getTxn_high_water_mark()); + assertEquals(3, txns.getOpen_txns().size()); + boolean[] saw = new boolean[4]; for (int i = 0; i < saw.length; i++) saw[i] = false; for (Long tid : txns.getOpen_txns()) { saw[tid.intValue()] = true; } for (int i = 1; i < saw.length; i++) assertTrue(saw[i]); txnHandler.commitTxn(new CommitTxnRequest(2)); + //this succeeds as abortTxn is idempotent + txnHandler.abortTxn(new AbortTxnRequest(1)); boolean gotException = false; try { - txnHandler.abortTxn(new AbortTxnRequest(1)); + txnHandler.abortTxn(new AbortTxnRequest(2)); } - catch(TxnAbortedException ex) { + catch(NoSuchTxnException ex) { gotException = true; - Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(1) + " already aborted", ex.getMessage()); + //if this wasn't an empty txn, we'd get a better msg + Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(2), ex.getMessage()); } Assert.assertTrue(gotException); gotException = false; + txnHandler.commitTxn(new CommitTxnRequest(3)); try { - txnHandler.abortTxn(new AbortTxnRequest(2)); + txnHandler.abortTxn(new AbortTxnRequest(3)); } catch(NoSuchTxnException ex) { gotException = true; - //if this wasn't an empty txn, we'd get a better msg - //Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(2) + " already committed.", ex.getMessage()); - Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(2), ex.getMessage()); + //txn 3 is not empty txn, so we get a better msg + Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(3) + " is already committed.", ex.getMessage()); } Assert.assertTrue(gotException); + gotException = false; try { - txnHandler.abortTxn(new AbortTxnRequest(3)); + txnHandler.abortTxn(new AbortTxnRequest(4)); } catch(NoSuchTxnException ex) { gotException = true; - Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(3), ex.getMessage()); + Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(4), ex.getMessage()); } Assert.assertTrue(gotException); } http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 5932d7e..af1f962 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -280,7 +280,7 @@ public class TestTxnCommands2 { runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b,c) " + makeValuesClause(moreTableData)); List<String> rs0 = runStatementOnDriver("select a,b,c from " + Table.ACIDTBL + " where a > 0 order by a,b,c"); } - @Ignore("not needed but useful for testing") +// @Ignore("not needed but useful for testing") @Test public void testNonAcidInsert() throws Exception { runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index 8f26099..6c53538 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -20,11 +20,14 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; +import org.apache.hadoop.hive.metastore.api.TxnState; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; @@ -47,6 +50,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import static junit.framework.Assert.assertEquals; + /** * Unit tests for {@link DbTxnManager}. * See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2} @@ -229,15 +234,13 @@ public class TestDbTxnManager { exception = null; ((DbTxnManager) txnMgr).openTxn(ctx, "AlexanderIII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2); Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); - runReaper(); - try { - txnMgr.rollbackTxn(); - } - catch (LockException ex) { - exception = ex; - } - Assert.assertNotNull("Expected exception2", exception); - Assert.assertEquals("Wrong Exception2", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg()); + runReaper();//this will abort the txn + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo(); + assertEquals(2, txnsInfo.getTxn_high_water_mark()); + assertEquals(2, txnsInfo.getOpen_txns().size()); + Assert.assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(1).getState()); + txnMgr.rollbackTxn();//this is idempotent } @Test