Repository: hive
Updated Branches:
  refs/heads/master cc3fd84ee -> 1d7f4b75d


HIVE-13014 RetryingMetaStoreClient is retrying acid related calls too 
aggressievley(Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1d7f4b75
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1d7f4b75
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1d7f4b75

Branch: refs/heads/master
Commit: 1d7f4b75de43db1201f9d494d7a2100f923a7ad6
Parents: cc3fd84
Author: Eugene Koifman <ekoif...@hortonworks.com>
Authored: Tue Jan 24 13:00:43 2017 -0800
Committer: Eugene Koifman <ekoif...@hortonworks.com>
Committed: Tue Jan 24 13:00:43 2017 -0800

----------------------------------------------------------------------
 .../common/classification/RetrySemantics.java   |  38 ++++
 .../hadoop/hive/metastore/IMetaStoreClient.java |   2 +
 .../hive/metastore/RetryingMetaStoreClient.java |  13 +-
 .../metastore/txn/CompactionTxnHandler.java     |  32 +++-
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 180 ++++++++++++++++---
 .../hadoop/hive/metastore/txn/TxnStore.java     |  38 +++-
 .../hive/metastore/txn/TestTxnHandler.java      |  42 +++--
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |   2 +-
 .../hive/ql/lockmgr/TestDbTxnManager.java       |  21 ++-
 9 files changed, 320 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/common/src/java/org/apache/hadoop/hive/common/classification/RetrySemantics.java
----------------------------------------------------------------------
diff --git 
a/common/src/java/org/apache/hadoop/hive/common/classification/RetrySemantics.java
 
b/common/src/java/org/apache/hadoop/hive/common/classification/RetrySemantics.java
new file mode 100644
index 0000000..abad45e
--- /dev/null
+++ 
b/common/src/java/org/apache/hadoop/hive/common/classification/RetrySemantics.java
@@ -0,0 +1,38 @@
+package org.apache.hadoop.hive.common.classification;
+
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * These annotations are meant to indicate how to handle retry logic.
+ * Initially meant for Metastore API when made across a network, i.e. 
asynchronously where
+ * the response may not reach the caller and thus it cannot know if the 
operation was actually
+ * performed on the server.
+ * @see RetryingMetastoreClient
+ */
+@InterfaceStability.Evolving
+@InterfaceAudience.LimitedPrivate("Hive developer")
+public class RetrySemantics {
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  public @interface Idempotent {
+    String[] value() default "";
+    int maxRetryCount() default Integer.MAX_VALUE;
+    int delayMs() default 100;
+  }
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  public @interface ReadOnly {/*trivially retry-able*/}
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  public @interface CannotRetry {}
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  public @interface SafeToRetry {
+    /*may not be Idempotent but is safe to retry*/
+    String[] value() default "";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index fb61db1..84ec332 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience.Public;
 import 
org.apache.hadoop.hive.common.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.hive.common.classification.RetrySemantics;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.annotation.NoReconnect;
@@ -1347,6 +1348,7 @@ public interface IMetaStoreClient {
    * aborted.  This can result from the transaction timing out.
    * @throws TException
    */
+  @RetrySemantics.CannotRetry
   LockResponse lock(LockRequest request)
       throws NoSuchTxnException, TxnAbortedException, TException;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
index 96d8248..a6545a9 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.metastore;
 
 import java.io.IOException;
+import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -28,6 +29,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.hive.common.classification.RetrySemantics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience.Public;
@@ -142,6 +144,15 @@ public class RetryingMetaStoreClient implements 
InvocationHandler {
     TException caughtException = null;
 
     boolean allowReconnect = ! method.isAnnotationPresent(NoReconnect.class);
+    boolean allowRetry = true;
+    Annotation[] directives = method.getDeclaredAnnotations();
+    if(directives != null) {
+      for(Annotation a : directives) {
+        if(a instanceof RetrySemantics.CannotRetry) {
+          allowRetry = false;
+        }
+      }
+    }
 
     while (true) {
       try {
@@ -200,7 +211,7 @@ public class RetryingMetaStoreClient implements 
InvocationHandler {
       }
 
 
-      if (retriesMade >= retryLimit || base.isLocalMetaStore()) {
+      if (retriesMade >= retryLimit || base.isLocalMetaStore() || !allowRetry) 
{
         throw caughtException;
       }
       retriesMade++;

http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 545244b..60839fa 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.metastore.txn;
 
+import org.apache.hadoop.hive.common.classification.RetrySemantics;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -55,6 +56,8 @@ class CompactionTxnHandler extends TxnHandler {
    * @return list of CompactionInfo structs.  These will not have id, type,
    * or runAs set since these are only potential compactions not actual ones.
    */
+  @Override
+  @RetrySemantics.ReadOnly
   public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws 
MetaException {
     Connection dbConn = null;
     Set<CompactionInfo> response = new HashSet<CompactionInfo>();
@@ -117,6 +120,8 @@ class CompactionTxnHandler extends TxnHandler {
    * @param cq_id id of this entry in the queue
    * @param user user to run the jobs as
    */
+  @Override
+  @RetrySemantics.Idempotent
   public void setRunAs(long cq_id, String user) throws MetaException {
     try {
       Connection dbConn = null;
@@ -154,6 +159,8 @@ class CompactionTxnHandler extends TxnHandler {
    * @param workerId id of the worker calling this, will be recorded in the db
    * @return an info element for this compaction request, or null if there is 
no work to do now.
    */
+  @Override
+  @RetrySemantics.SafeToRetry
   public CompactionInfo findNextToCompact(String workerId) throws 
MetaException {
     try {
       Connection dbConn = null;
@@ -225,6 +232,8 @@ class CompactionTxnHandler extends TxnHandler {
    * and put it in the ready to clean state.
    * @param info info on the compaction entry to mark as compacted.
    */
+  @Override
+  @RetrySemantics.SafeToRetry
   public void markCompacted(CompactionInfo info) throws MetaException {
     try {
       Connection dbConn = null;
@@ -264,6 +273,8 @@ class CompactionTxnHandler extends TxnHandler {
    * be cleaned.
    * @return information on the entry in the queue.
    */
+  @Override
+  @RetrySemantics.ReadOnly
   public List<CompactionInfo> findReadyToClean() throws MetaException {
     Connection dbConn = null;
     List<CompactionInfo> rc = new ArrayList<CompactionInfo>();
@@ -317,6 +328,8 @@ class CompactionTxnHandler extends TxnHandler {
    * 
    * @param info info on the compaction entry to remove
    */
+  @Override
+  @RetrySemantics.CannotRetry
   public void markCleaned(CompactionInfo info) throws MetaException {
     try {
       Connection dbConn = null;
@@ -429,6 +442,8 @@ class CompactionTxnHandler extends TxnHandler {
    * txns exist can be that now work was done in this txn (e.g. Streaming 
opened TransactionBatch and
    * abandoned it w/o doing any work) or due to {@link 
#markCleaned(CompactionInfo)} being called.
    */
+  @Override
+  @RetrySemantics.SafeToRetry
   public void cleanEmptyAbortedTxns() throws MetaException {
     try {
       Connection dbConn = null;
@@ -493,6 +508,8 @@ class CompactionTxnHandler extends TxnHandler {
    * @param hostname Name of this host.  It is assumed this prefixes the 
thread's worker id,
    *                 so that like hostname% will match the worker id.
    */
+  @Override
+  @RetrySemantics.Idempotent
   public void revokeFromLocalWorkers(String hostname) throws MetaException {
     try {
       Connection dbConn = null;
@@ -535,6 +552,8 @@ class CompactionTxnHandler extends TxnHandler {
    * @param timeout number of milliseconds since start time that should elapse 
before a worker is
    *                declared dead.
    */
+  @Override
+  @RetrySemantics.Idempotent
   public void revokeTimedoutWorkers(long timeout) throws MetaException {
     try {
       Connection dbConn = null;
@@ -575,6 +594,8 @@ class CompactionTxnHandler extends TxnHandler {
    * table level stats are examined.
    * @throws MetaException
    */
+  @Override
+  @RetrySemantics.ReadOnly
   public List<String> findColumnsWithStats(CompactionInfo ci) throws 
MetaException {
     Connection dbConn = null;
     Statement stmt = null;
@@ -631,6 +652,8 @@ class CompactionTxnHandler extends TxnHandler {
    * Record the highest txn id that the {@code ci} compaction job will pay 
attention to.
    * This is the highest resolved txn id, i.e. such that there are no open 
txns with lower ids.
    */
+  @Override
+  @RetrySemantics.Idempotent
   public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) 
throws MetaException {
     Connection dbConn = null;
     Statement stmt = null;
@@ -696,6 +719,8 @@ class CompactionTxnHandler extends TxnHandler {
    * it's not recent.
    * @throws MetaException
    */
+  @Override
+  @RetrySemantics.SafeToRetry
   public void purgeCompactionHistory() throws MetaException {
     Connection dbConn = null;
     Statement stmt = null;
@@ -762,7 +787,7 @@ class CompactionTxnHandler extends TxnHandler {
    * this ensures that the number of failed compaction entries retained is > 
than number of failed
    * compaction threshold which prevents new compactions from being scheduled.
    */
-  public int getFailedCompactionRetention() {
+  private int getFailedCompactionRetention() {
     int failedThreshold = 
conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
     int failedRetention = 
conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED);
     if(failedRetention < failedThreshold) {
@@ -783,6 +808,8 @@ class CompactionTxnHandler extends TxnHandler {
    * That would be a meta operations, i.e. first find all partitions for this 
table (which have 
    * txn info) and schedule each compaction separately.  This avoids 
complications in this logic.
    */
+  @Override
+  @RetrySemantics.ReadOnly
   public boolean checkFailedCompactions(CompactionInfo ci) throws 
MetaException {
     Connection dbConn = null;
     Statement stmt = null;
@@ -829,6 +856,8 @@ class CompactionTxnHandler extends TxnHandler {
    * If there is no entry in compaction_queue, it means Initiator failed to 
even schedule a compaction,
    * which we record as ATTEMPTED_STATE entry in history.
    */
+  @Override
+  @RetrySemantics.CannotRetry
   public void markFailed(CompactionInfo ci) throws MetaException {//todo: this 
should not throw
     //todo: this should take "comment" as parameter to set in CC_META_INFO to 
provide some context for the failure
     try {
@@ -894,6 +923,7 @@ class CompactionTxnHandler extends TxnHandler {
     }
   }
   @Override
+  @RetrySemantics.Idempotent
   public void setHadoopJobId(String hadoopJobId, long id) {
     try {
       Connection dbConn = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 75a58c6..805db34 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -30,10 +30,10 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.hive.common.ServerUtils;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.common.classification.RetrySemantics;
 import org.apache.hadoop.hive.metastore.DatabaseProduct;
 import org.apache.hadoop.hive.metastore.HouseKeeperService;
 import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.commons.dbcp.PoolingDataSource;
@@ -101,6 +101,17 @@ import java.util.regex.Pattern;
  * It's imperative that any operation on a txn (e.g. commit), ensure 
(atomically) that this txn is
  * still valid and active.  In the code this is usually achieved at the same 
time the txn record
  * is locked for some operation.
+ * 
+ * Note on retry logic:
+ * Metastore has retry logic in both {@link 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient}
+ * and {@link org.apache.hadoop.hive.metastore.RetryingHMSHandler}.  The retry 
logic there is very
+ * generic and is not aware whether the operations are idempotent or not.  
(This is separate from
+ * retry logic here in TxnHander which can/does retry DB errors 
intelligently).  The worst case is
+ * when an op here issues a successful commit against the RDBMS but the 
calling stack doesn't
+ * receive the ack and retries.  (If an op fails before commit, it's trivially 
idempotent)
+ * Thus the ops here need to be made idempotent as much as possible or
+ * the metstore call stack should have logic not to retry.  There are {@link 
RetrySemantics}
+ * annotations to document the behavior.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -120,6 +131,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   // Transaction states
   static final protected char TXN_ABORTED = 'a';
   static final protected char TXN_OPEN = 'o';
+  //todo: make these like OperationType and remove above char constatns
+  enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN}
 
   // Lock states
   static final protected char LOCK_ACQUIRED = 'a';
@@ -263,7 +276,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       dumpConfig = false;
     }
   }
-
+  @Override
+  @RetrySemantics.ReadOnly
   public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
     try {
       // We need to figure out the current transaction number and the list of
@@ -339,7 +353,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       return getOpenTxnsInfo();
     }
   }
-
+  @Override
+  @RetrySemantics.ReadOnly
   public GetOpenTxnsResponse getOpenTxns() throws MetaException {
     try {
       // We need to figure out the current transaction number and the list of
@@ -414,6 +429,12 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     }
   }
 
+  /**
+   * Retry-by-caller note:
+   * Worst case, it will leave an open txn which will timeout.
+   */
+  @Override
+  @RetrySemantics.Idempotent
   public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {
     if (openTxnsCounter == null) {
       synchronized (TxnHandler.class) {
@@ -515,7 +536,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       return openTxns(rqst);
     }
   }
-
+  @Override
+  @RetrySemantics.Idempotent
   public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, 
MetaException, TxnAbortedException {
     long txnid = rqst.getTxnid();
     try {
@@ -525,10 +547,14 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
           stmt = dbConn.createStatement();
-          ensureValidTxn(dbConn, txnid, stmt);
+          TxnStatus status = findTxnState(txnid,stmt);
+          if(status == TxnStatus.ABORTED) {
+            LOG.info("abortTxn(" + JavaUtils.txnIdToString(txnid) +
+              ") requested by it is already " + TxnStatus.ABORTED);
+            return;
+          }
+          raiseTxnUnexpectedState(status, txnid);
         }
 
         LOG.debug("Going to commit");
@@ -547,7 +573,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       abortTxn(rqst);
     }
   }
-
+  @Override
+  @RetrySemantics.Idempotent
   public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, 
MetaException {
     List<Long> txnids = rqst.getTxn_ids();
     try {
@@ -556,7 +583,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         int numAborted = abortTxns(dbConn, txnids, false);
         if (numAborted != txnids.size()) {
-          LOG.warn("Abort Transactions command only abort " + numAborted + " 
out of " +
+          LOG.warn("Abort Transactions command only aborted " + numAborted + " 
out of " +
               txnids.size() + " transactions. It's possible that the other " +
               (txnids.size() - numAborted) +
               " transactions have been aborted or committed, or the 
transaction ids are invalid.");
@@ -602,6 +629,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
    * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed)
    * 'x' would be updated to the same value by both, i.e. lost update. 
    */
+  @Override
+  @RetrySemantics.Idempotent("No-op if already committed")
   public void commitTxn(CommitTxnRequest rqst)
     throws NoSuchTxnException, TxnAbortedException,  MetaException {
     long txnid = rqst.getTxnid();
@@ -622,9 +651,19 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
          */
         lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
         if (lockHandle == null) {
-          //this also ensures that txn is still there and in expected state 
(hasn't been timed out)
-          ensureValidTxn(dbConn, txnid, stmt);
+          //if here, txn was not found (in expected state)
+          TxnStatus actualTxnStatus = findTxnState(txnid, stmt);
+          if(actualTxnStatus == TxnStatus.COMMITTED) {
+            /**
+             * This makes the operation idempotent
+             * (assume that this is most likely due to retry logic)
+             */
+            LOG.info("Nth commitTxn(" + JavaUtils.txnIdToString(txnid) + ") 
msg");
+            return;
+          }
+          raiseTxnUnexpectedState(actualTxnStatus, txnid);
           shouldNeverHappen(txnid);
+          //dbConn is rolled back in finally{}
         }
         String conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + 
txnid + " and tc_operation_type IN(" +
           quoteChar(OpertaionType.UPDATE.sqlConst) + "," + 
quoteChar(OpertaionType.DELETE.sqlConst) + ")";
@@ -645,9 +684,17 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           }
           long commitId = commitIdRs.getLong(1);
           Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint();
+          /**
+           * "select distinct" is used below because
+           * 1. once we get to multi-statement txns, we only care to record 
that something was updated once
+           * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is 
retried by caller it my create
+           *  duplicate entries in TXN_COMPONENTS
+           * but we want to add a PK on WRITE_SET which won't have unique rows 
w/o this distinct
+           * even if it includes all of it's columns
+           */
           int numCompsWritten = stmt.executeUpdate(
             "insert into WRITE_SET (ws_database, ws_table, ws_partition, 
ws_txnid, ws_commit_id, ws_operation_type)" +
-            " select tc_database, tc_table, tc_partition, tc_txnid, " + 
commitId + ", tc_operation_type " + conflictSQLSuffix);
+            " select distinct tc_database, tc_table, tc_partition, tc_txnid, " 
+ commitId + ", tc_operation_type " + conflictSQLSuffix);
           /**
            * see if there are any overlapping txns wrote the same element, 
i.e. have a conflict
            * Since entire commit operation is mutexed wrt other start/commit 
ops,
@@ -750,6 +797,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     }
   }
   @Override
+  @RetrySemantics.SafeToRetry
   public void performWriteSetGC() {
     Connection dbConn = null;
     Statement stmt = null;
@@ -798,7 +846,17 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
    * connection (but separate transactions).  This avoid some flakiness in 
BONECP where if you
    * perform an operation on 1 connection and immediately get another fron the 
pool, the 2nd one
    * doesn't see results of the first.
+   * 
+   * Retry-by-caller note: If the call to lock is from a transaction, then in 
the worst case
+   * there will be a duplicate set of locks but both sets will belong to the 
same txn so they 
+   * will not conflict with each other.  For locks w/o txn context (i.e. 
read-only query), this
+   * may lead to deadlock (at least a long wait).  (e.g. 1st call creates 
locks in {@code LOCK_WAITING}
+   * mode and response gets lost.  Then {@link 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient}
+   * retries, and enqueues another set of locks in LOCK_WAITING.  The 2nd 
LockResponse is delivered
+   * to the DbLockManager, which will keep dong {@link 
#checkLock(CheckLockRequest)} until the 1st
+   * set of locks times out.
    */
+  @RetrySemantics.CannotRetry
   public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, 
TxnAbortedException, MetaException {
     ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst);
     try {
@@ -1063,7 +1121,12 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
    * {@link #checkLock(java.sql.Connection, long)}  must run at SERIALIZABLE 
(make sure some lock we are checking
    * against doesn't move from W to A in another txn) but this method can 
heartbeat in
    * separate txn at READ_COMMITTED.
+   * 
+   * Retry-by-caller note:
+   * Retryable because {@link #checkLock(Connection, long)} is
    */
+  @Override
+  @RetrySemantics.SafeToRetry
   public LockResponse checkLock(CheckLockRequest rqst)
     throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, 
MetaException {
     try {
@@ -1112,6 +1175,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
    * heartbeat/performTimeout which are update/delete of HIVE_LOCKS thus will 
be locked as needed by db.
    * since this only removes from HIVE_LOCKS at worst some lock acquire is 
delayed
    */
+  @RetrySemantics.Idempotent
   public void unlock(UnlockRequest rqst)
     throws NoSuchLockException, TxnOpenException, MetaException {
     try {
@@ -1146,11 +1210,15 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
           if(info == null) {
             //didn't find any lock with extLockId but at ReadCommitted there 
is a possibility that
             //it existed when above delete ran but it didn't have the expected 
state.
-            LOG.error("No lock in " + LOCK_WAITING + " mode found for unlock(" 
+ rqst + ")");
-            throw new NoSuchLockException("No such lock " + 
JavaUtils.lockIdToString(extLockId));
+            LOG.info("No lock in " + LOCK_WAITING + " mode found for unlock(" +
+              JavaUtils.lockIdToString(rqst.getLockid()) + ")");
+            //bail here to make the operation idempotent
+            return;
           }
           if(info.txnId != 0) {
             String msg = "Unlocking locks associated with transaction not 
permitted.  " + info;
+            //if a lock is associated with a txn we can only "unlock" if if 
it's in WAITING state
+            // which really means that the caller wants to give up waiting for 
the lock
             LOG.error(msg);
             throw new TxnOpenException(msg);
           }
@@ -1189,6 +1257,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       this.e = e;
     }
   }
+  @RetrySemantics.ReadOnly
   public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws 
MetaException {
     try {
       Connection dbConn = null;
@@ -1297,6 +1366,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
    * {@code ids} should only have txnid or lockid but not both, ideally.
    * Currently DBTxnManager.heartbeat() enforces this.
    */
+  @Override
+  @RetrySemantics.SafeToRetry
   public void heartbeat(HeartbeatRequest ids)
     throws NoSuchTxnException,  NoSuchLockException, TxnAbortedException, 
MetaException {
     try {
@@ -1318,7 +1389,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       heartbeat(ids);
     }
   }
-
+  @Override
+  @RetrySemantics.SafeToRetry
   public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest 
rqst)
     throws MetaException {
     try {
@@ -1379,6 +1451,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     return id;
   }
   @Override
+  @RetrySemantics.Idempotent
   public CompactionResponse compact(CompactionRequest rqst) throws 
MetaException {
     // Put a compaction request in the queue.
     try {
@@ -1503,6 +1576,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         return Character.toString(s);
     }
   }
+  @RetrySemantics.ReadOnly
   public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws 
MetaException {
     ShowCompactResponse response = new ShowCompactResponse(new 
ArrayList<ShowCompactResponseElement>());
     Connection dbConn = null;
@@ -1574,6 +1648,13 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       + JavaUtils.lockIdToString(extLockId) + " " + intLockId);
   }
 
+  /**
+   * Retry-by-caller note:
+   * This may be retried after dbConn.commit.  At worst, it will create 
duplicate entries in
+   * TXN_COMPONENTS which won't affect anything.  See more comments in {@link 
#commitTxn(CommitTxnRequest)}
+   */
+  @Override
+  @RetrySemantics.SafeToRetry
   public void addDynamicPartitions(AddDynamicPartitions rqst)
       throws NoSuchTxnException,  TxnAbortedException, MetaException {
     Connection dbConn = null;
@@ -1629,7 +1710,11 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   /**
    * Clean up corresponding records in metastore tables when corresponding 
object is dropped,
    * specifically: TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, 
COMPLETED_COMPACTIONS
+   * Retry-by-caller note: this is only idempotent assuming it's only called 
by dropTable/Db/etc
+   * operations.
    */
+  @Override
+  @RetrySemantics.Idempotent
   public void cleanupRecords(HiveObjectType type, Database db, Table table,
                              Iterator<Partition> partitionIterator) throws 
MetaException {
     try {
@@ -2322,6 +2407,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
    * If they happen to be for the same txnid, there will be a WW conflict (in 
MS DB), if different txnid,
    * checkLock() will in the worst case keep locks in Waiting state a little 
longer.
    */
+  @RetrySemantics.SafeToRetry("See @SafeToRetry")
   private LockResponse checkLock(Connection dbConn, long extLockId)
     throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, 
MetaException, SQLException {
     TxnStore.MutexAPI.LockHandle handle =  null;
@@ -2331,7 +2417,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     /**
      * todo: Longer term we should pass this from client somehow - this would 
be an optimization;  once
      * that is in place make sure to build and test "writeSet" below using 
OperationType not LockType
-     * With SP we assume that the query modifies exactly the partitions it 
locked.  (not entirely
+     * With Static Partitions we assume that the query modifies exactly the 
partitions it locked.  (not entirely
      * realistic since Update/Delete may have some predicate that filters out 
all records out of
      * some partition(s), but plausible).  For DP, we acquire locks very wide 
(all known partitions),
      * but for most queries only a fraction will actually be updated.  
#addDynamicPartitions() tells
@@ -2517,6 +2603,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
         // If we've found it and it's already been marked acquired,
         // then just look at the other locks.
         if (locks[index].state == LockState.ACQUIRED) {
+          /**this is what makes this method @SafeToRetry*/
           continue;
         }
 
@@ -2720,6 +2807,55 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     }
   }
 
+  /**
+   * Returns the state of the transaction iff it's able to determine it.  Some 
cases where it cannot:
+   * 1. txnid was Aborted/Committed and then GC'd (compacted)
+   * 2. txnid was committed but it didn't modify anything (nothing in 
COMPLETED_TXN_COMPONENTS)
+   */
+  private TxnStatus findTxnState(long txnid, Statement stmt) throws 
SQLException, MetaException {
+    String s = "select txn_state from TXNS where txn_id = " + txnid;
+    LOG.debug("Going to execute query <" + s + ">");
+    ResultSet rs = stmt.executeQuery(s);
+    if (!rs.next()) {
+      s = sqlGenerator.addLimitClause(1, "1 from COMPLETED_TXN_COMPONENTS 
where CTC_TXNID = " + txnid);
+      LOG.debug("Going to execute query <" + s + ">");
+      ResultSet rs2 = stmt.executeQuery(s);
+      if(rs2.next()) {
+        return TxnStatus.COMMITTED;
+      }
+      //could also check WRITE_SET but that seems overkill
+      return TxnStatus.UNKNOWN;
+    }
+    char txnState = rs.getString(1).charAt(0);
+    if (txnState == TXN_ABORTED) {
+      return TxnStatus.ABORTED;
+    }
+    assert txnState == TXN_OPEN : "we found it in TXNS but it's not ABORTED, 
so must be OPEN";
+    return TxnStatus.OPEN;
+  }
+
+  /**
+   * Used to raise an informative error when the caller expected a txn in a 
particular TxnStatus
+   * but found it in some other status
+   */
+  private static void raiseTxnUnexpectedState(TxnStatus actualStatus, long 
txnid) 
+    throws NoSuchTxnException, TxnAbortedException {
+    switch (actualStatus) {
+      case ABORTED:
+        throw new TxnAbortedException("Transaction " + 
JavaUtils.txnIdToString(txnid) + " already aborted");
+      case COMMITTED:
+        throw new NoSuchTxnException("Transaction " + 
JavaUtils.txnIdToString(txnid) + " is already committed.");
+      case UNKNOWN:
+        throw new NoSuchTxnException("No such transaction " + 
JavaUtils.txnIdToString(txnid));
+      case OPEN:
+        throw new NoSuchTxnException(JavaUtils.txnIdToString(txnid) + " is " + 
TxnStatus.OPEN);
+      default:
+        throw new IllegalArgumentException("Unknown TxnStatus " + 
actualStatus);
+    }
+  }
+  /**
+   * Returns the state of the transaction with {@code txnid} or throws if 
{@code raiseError} is true.
+   */
   private static void ensureValidTxn(Connection dbConn, long txnid, Statement 
stmt)
       throws SQLException, NoSuchTxnException, TxnAbortedException {
     // We need to check whether this transaction is valid and open
@@ -2734,7 +2870,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       //possible for for multi-stmt txns
       boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0;
       LOG.debug("Going to rollback");
-      dbConn.rollback();
+      rollbackDBConn(dbConn);
       if(alreadyCommitted) {
         //makes the message more informative - helps to find bugs in client 
code
         throw new NoSuchTxnException("Transaction " + 
JavaUtils.txnIdToString(txnid) + " is already committed.");
@@ -2743,7 +2879,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     }
     if (rs.getString(1).charAt(0) == TXN_ABORTED) {
       LOG.debug("Going to rollback");
-      dbConn.rollback();
+      rollbackDBConn(dbConn);
       throw new TxnAbortedException("Transaction " + 
JavaUtils.txnIdToString(txnid) +
         " already aborted");//todo: add time of abort, which is not currently 
tracked.  Requires schema change
     }
@@ -2869,6 +3005,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
    * Will also delete locks which are not associated with a transaction and 
have timed out
    * Tries to keep transactions (against metastore db) small to reduce lock 
contention.
    */
+  @RetrySemantics.Idempotent
   public void performTimeOuts() {
     Connection dbConn = null;
     Statement stmt = null;
@@ -2939,7 +3076,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       close(rs, stmt, dbConn);
     }
   }
-
+  @Override
+  @RetrySemantics.ReadOnly
   public void countOpenTxns() throws MetaException {
     Connection dbConn = null;
     Statement stmt = null;
@@ -3244,6 +3382,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     }
   }
   @Override
+  @RetrySemantics.Idempotent
   public MutexAPI getMutexAPI() {
     return this;
   }
@@ -3360,6 +3499,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   }
   /**
    * Helper class that generates SQL queries with syntax specific to target DB
+   * todo: why throw MetaException?
    */
   @VisibleForTesting
   static final class SQLGenerator {
@@ -3459,7 +3599,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     }
     /**
      * Suppose you have a query "select a,b from T" and you want to limit the 
result set
-     * to the first 5 rows.  The mechanism to do that differs in different DB.
+     * to the first 5 rows.  The mechanism to do that differs in different DBs.
      * Make {@code noSelectsqlQuery} to be "a,b from T" and this method will 
return the
      * appropriately modified row limiting query.
      *

http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 879ae55..041d55b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.txn;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.common.classification.RetrySemantics;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.*;
 
@@ -68,6 +69,7 @@ public interface TxnStore {
    * @return information about open transactions
    * @throws MetaException
    */
+  @RetrySemantics.ReadOnly
   public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException;
 
   /**
@@ -75,12 +77,14 @@ public interface TxnStore {
    * @return list of open transactions, as well as a high water mark.
    * @throws MetaException
    */
+  @RetrySemantics.ReadOnly
   public GetOpenTxnsResponse getOpenTxns() throws MetaException;
 
   /**
    * Get the count for open transactions.
    * @throws MetaException
    */
+  @RetrySemantics.ReadOnly
   public void countOpenTxns() throws MetaException;
 
   /**
@@ -89,6 +93,7 @@ public interface TxnStore {
    * @return information on opened transactions
    * @throws MetaException
    */
+  @RetrySemantics.Idempotent
   public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException;
 
   /**
@@ -97,6 +102,7 @@ public interface TxnStore {
    * @throws NoSuchTxnException
    * @throws MetaException
    */
+  @RetrySemantics.Idempotent
   public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, 
MetaException, TxnAbortedException;
 
   /**
@@ -105,6 +111,7 @@ public interface TxnStore {
    * @throws NoSuchTxnException
    * @throws MetaException
    */
+  @RetrySemantics.Idempotent
   public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, 
MetaException;
 
   /**
@@ -114,6 +121,7 @@ public interface TxnStore {
    * @throws TxnAbortedException
    * @throws MetaException
    */
+  @RetrySemantics.Idempotent
   public void commitTxn(CommitTxnRequest rqst)
     throws NoSuchTxnException, TxnAbortedException,  MetaException;
 
@@ -126,6 +134,7 @@ public interface TxnStore {
    * @throws TxnAbortedException
    * @throws MetaException
    */
+  @RetrySemantics.CannotRetry
   public LockResponse lock(LockRequest rqst)
     throws NoSuchTxnException, TxnAbortedException, MetaException;
 
@@ -139,6 +148,7 @@ public interface TxnStore {
    * @throws TxnAbortedException
    * @throws MetaException
    */
+  @RetrySemantics.SafeToRetry
   public LockResponse checkLock(CheckLockRequest rqst)
     throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, 
MetaException;
 
@@ -151,6 +161,7 @@ public interface TxnStore {
    * @throws TxnOpenException
    * @throws MetaException
    */
+  @RetrySemantics.Idempotent
   public void unlock(UnlockRequest rqst)
     throws NoSuchLockException, TxnOpenException, MetaException;
 
@@ -160,6 +171,7 @@ public interface TxnStore {
    * @return lock information.
    * @throws MetaException
    */
+  @RetrySemantics.ReadOnly
   public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws 
MetaException;
 
   /**
@@ -170,6 +182,7 @@ public interface TxnStore {
    * @throws TxnAbortedException
    * @throws MetaException
    */
+  @RetrySemantics.SafeToRetry
   public void heartbeat(HeartbeatRequest ids)
     throws NoSuchTxnException,  NoSuchLockException, TxnAbortedException, 
MetaException;
 
@@ -179,6 +192,7 @@ public interface TxnStore {
    * @return info on txns that were heartbeated
    * @throws MetaException
    */
+  @RetrySemantics.SafeToRetry
   public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest 
rqst)
     throws MetaException;
 
@@ -189,6 +203,7 @@ public interface TxnStore {
    * @return id of the compaction that has been started or existing id if this 
resource is already scheduled
    * @throws MetaException
    */
+  @RetrySemantics.Idempotent
   public CompactionResponse compact(CompactionRequest rqst) throws 
MetaException;
 
   /**
@@ -197,6 +212,7 @@ public interface TxnStore {
    * @return compaction information
    * @throws MetaException
    */
+  @RetrySemantics.ReadOnly
   public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws 
MetaException;
 
   /**
@@ -206,6 +222,7 @@ public interface TxnStore {
    * @throws TxnAbortedException
    * @throws MetaException
    */
+  @RetrySemantics.SafeToRetry
   public void addDynamicPartitions(AddDynamicPartitions rqst)
       throws NoSuchTxnException,  TxnAbortedException, MetaException;
 
@@ -217,12 +234,14 @@ public interface TxnStore {
    * @param partitionIterator partition iterator
    * @throws MetaException
    */
+  @RetrySemantics.Idempotent
   public void cleanupRecords(HiveObjectType type, Database db, Table table,
                              Iterator<Partition> partitionIterator) throws 
MetaException;
 
   /**
    * Timeout transactions and/or locks.  This should only be called by the 
compactor.
    */
+  @RetrySemantics.Idempotent
   public void performTimeOuts();
 
   /**
@@ -234,6 +253,7 @@ public interface TxnStore {
    * @return list of CompactionInfo structs.  These will not have id, type,
    * or runAs set since these are only potential compactions not actual ones.
    */
+  @RetrySemantics.ReadOnly
   public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws 
MetaException;
 
   /**
@@ -242,6 +262,7 @@ public interface TxnStore {
    * @param cq_id id of this entry in the queue
    * @param user user to run the jobs as
    */
+  @RetrySemantics.Idempotent
   public void setRunAs(long cq_id, String user) throws MetaException;
 
   /**
@@ -250,6 +271,7 @@ public interface TxnStore {
    * @param workerId id of the worker calling this, will be recorded in the db
    * @return an info element for this compaction request, or null if there is 
no work to do now.
    */
+  @RetrySemantics.ReadOnly
   public CompactionInfo findNextToCompact(String workerId) throws 
MetaException;
 
   /**
@@ -257,6 +279,7 @@ public interface TxnStore {
    * and put it in the ready to clean state.
    * @param info info on the compaction entry to mark as compacted.
    */
+  @RetrySemantics.SafeToRetry
   public void markCompacted(CompactionInfo info) throws MetaException;
 
   /**
@@ -264,6 +287,7 @@ public interface TxnStore {
    * be cleaned.
    * @return information on the entry in the queue.
    */
+  @RetrySemantics.ReadOnly
   public List<CompactionInfo> findReadyToClean() throws MetaException;
 
   /**
@@ -272,6 +296,7 @@ public interface TxnStore {
    * 
    * @param info info on the compaction entry to remove
    */
+  @RetrySemantics.CannotRetry
   public void markCleaned(CompactionInfo info) throws MetaException;
 
   /**
@@ -281,6 +306,7 @@ public interface TxnStore {
    * @param info information on the compaction that failed.
    * @throws MetaException
    */
+  @RetrySemantics.CannotRetry
   public void markFailed(CompactionInfo info) throws MetaException;
 
   /**
@@ -288,6 +314,7 @@ public interface TxnStore {
    * txns exist can be that now work was done in this txn (e.g. Streaming 
opened TransactionBatch and
    * abandoned it w/o doing any work) or due to {@link 
#markCleaned(CompactionInfo)} being called.
    */
+  @RetrySemantics.SafeToRetry
   public void cleanEmptyAbortedTxns() throws MetaException;
 
   /**
@@ -299,6 +326,7 @@ public interface TxnStore {
    * @param hostname Name of this host.  It is assumed this prefixes the 
thread's worker id,
    *                 so that like hostname% will match the worker id.
    */
+  @RetrySemantics.Idempotent
   public void revokeFromLocalWorkers(String hostname) throws MetaException;
 
   /**
@@ -310,6 +338,7 @@ public interface TxnStore {
    * @param timeout number of milliseconds since start time that should elapse 
before a worker is
    *                declared dead.
    */
+  @RetrySemantics.Idempotent
   public void revokeTimedoutWorkers(long timeout) throws MetaException;
 
   /**
@@ -318,11 +347,13 @@ public interface TxnStore {
    * table level stats are examined.
    * @throws MetaException
    */
+  @RetrySemantics.ReadOnly
   public List<String> findColumnsWithStats(CompactionInfo ci) throws 
MetaException;
 
   /**
    * Record the highest txn id that the {@code ci} compaction job will pay 
attention to.
    */
+  @RetrySemantics.Idempotent
   public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) 
throws MetaException;
 
   /**
@@ -333,12 +364,14 @@ public interface TxnStore {
    * it's not recent.
    * @throws MetaException
    */
+  @RetrySemantics.SafeToRetry
   public void purgeCompactionHistory() throws MetaException;
 
   /**
    * WriteSet tracking is used to ensure proper transaction isolation.  This 
method deletes the 
    * transaction metadata once it becomes unnecessary.  
    */
+  @RetrySemantics.SafeToRetry
   public void performWriteSetGC();
 
   /**
@@ -349,6 +382,7 @@ public interface TxnStore {
    * @return true if it is ok to compact, false if there have been too many 
failures.
    * @throws MetaException
    */
+  @RetrySemantics.ReadOnly
   public boolean checkFailedCompactions(CompactionInfo ci) throws 
MetaException;
 
   @VisibleForTesting
@@ -357,6 +391,7 @@ public interface TxnStore {
   @VisibleForTesting
   long setTimeout(long milliseconds);
 
+  @RetrySemantics.Idempotent
   public MutexAPI getMutexAPI();
 
   /**
@@ -382,7 +417,7 @@ public interface TxnStore {
     public void acquireLock(String key, LockHandle handle) throws 
MetaException;
     public static interface LockHandle {
       /**
-       * Releases all locks associcated with this handle.
+       * Releases all locks associated with this handle.
        */
       public void releaseLocks();
     }
@@ -393,5 +428,6 @@ public interface TxnStore {
    * it calls this to update the metadata.
    * @param id {@link CompactionInfo#id}
    */
+  @RetrySemantics.Idempotent
   public void setHadoopJobId(String hadoopJobId, long id);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java 
b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 11cedb9..adfe98a 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.txn;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
 import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
@@ -139,58 +140,69 @@ public class TestTxnHandler {
 
   @Test
   public void testAbortTxn() throws Exception {
-    OpenTxnsResponse openedTxns = txnHandler.openTxns(new OpenTxnRequest(2, 
"me", "localhost"));
+    OpenTxnsResponse openedTxns = txnHandler.openTxns(new OpenTxnRequest(3, 
"me", "localhost"));
     List<Long> txnList = openedTxns.getTxn_ids();
     long first = txnList.get(0);
     assertEquals(1L, first);
     long second = txnList.get(1);
     assertEquals(2L, second);
     txnHandler.abortTxn(new AbortTxnRequest(1));
+    List<String> parts = new ArrayList<String>();
+    parts.add("p=1");
+    AddDynamicPartitions adp = new AddDynamicPartitions(3, "default", "T", 
parts);
+    adp.setOperationType(DataOperationType.INSERT);
+    txnHandler.addDynamicPartitions(adp);
     GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
-    assertEquals(2L, txnsInfo.getTxn_high_water_mark());
-    assertEquals(2, txnsInfo.getOpen_txns().size());
+    assertEquals(3, txnsInfo.getTxn_high_water_mark());
+    assertEquals(3, txnsInfo.getOpen_txns().size());
     assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId());
     assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(0).getState());
     assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId());
     assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState());
+    assertEquals(3, txnsInfo.getOpen_txns().get(2).getId());
+    assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(2).getState());
 
     GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
-    assertEquals(2L, txns.getTxn_high_water_mark());
-    assertEquals(2, txns.getOpen_txns().size());
-    boolean[] saw = new boolean[3];
+    assertEquals(3, txns.getTxn_high_water_mark());
+    assertEquals(3, txns.getOpen_txns().size());
+    boolean[] saw = new boolean[4];
     for (int i = 0; i < saw.length; i++) saw[i] = false;
     for (Long tid : txns.getOpen_txns()) {
       saw[tid.intValue()] = true;
     }
     for (int i = 1; i < saw.length; i++) assertTrue(saw[i]);
     txnHandler.commitTxn(new CommitTxnRequest(2));
+    //this succeeds as abortTxn is idempotent
+    txnHandler.abortTxn(new AbortTxnRequest(1));
     boolean gotException = false;
     try {
-      txnHandler.abortTxn(new AbortTxnRequest(1));
+      txnHandler.abortTxn(new AbortTxnRequest(2));
     }
-    catch(TxnAbortedException ex) {
+    catch(NoSuchTxnException ex) {
       gotException = true;
-      Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(1) + " 
already aborted", ex.getMessage());
+      //if this wasn't an empty txn, we'd get a better msg
+      Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(2), 
ex.getMessage());
     }
     Assert.assertTrue(gotException);
     gotException = false;
+    txnHandler.commitTxn(new CommitTxnRequest(3));
     try {
-      txnHandler.abortTxn(new AbortTxnRequest(2));
+      txnHandler.abortTxn(new AbortTxnRequest(3));
     }
     catch(NoSuchTxnException ex) {
       gotException = true;
-      //if this wasn't an empty txn, we'd get a better msg
-      //Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(2) + " 
already committed.", ex.getMessage());
-      Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(2), 
ex.getMessage());
+      //txn 3 is not empty txn, so we get a better msg
+      Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(3) + " is 
already committed.", ex.getMessage());
     }
     Assert.assertTrue(gotException);
+    
     gotException = false;
     try {
-      txnHandler.abortTxn(new AbortTxnRequest(3));
+      txnHandler.abortTxn(new AbortTxnRequest(4));
     }
     catch(NoSuchTxnException ex) {
       gotException = true;
-      Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(3), 
ex.getMessage());
+      Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(4), 
ex.getMessage());
     }
     Assert.assertTrue(gotException);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 5932d7e..af1f962 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -280,7 +280,7 @@ public class TestTxnCommands2 {
     runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b,c) " + 
makeValuesClause(moreTableData));
     List<String> rs0 = runStatementOnDriver("select a,b,c from " + 
Table.ACIDTBL + " where a > 0 order by a,b,c");
   }
-  @Ignore("not needed but useful for testing")
+//  @Ignore("not needed but useful for testing")
   @Test
   public void testNonAcidInsert() throws Exception {
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) 
values(1,2)");

http://git-wip-us.apache.org/repos/asf/hive/blob/1d7f4b75/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java 
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 8f26099..6c53538 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -20,11 +20,14 @@ package org.apache.hadoop.hive.ql.lockmgr;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.TxnState;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
@@ -47,6 +50,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static junit.framework.Assert.assertEquals;
+
 /**
  * Unit tests for {@link DbTxnManager}.
  * See additional tests in {@link 
org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2}
@@ -229,15 +234,13 @@ public class TestDbTxnManager {
     exception = null;
     ((DbTxnManager) txnMgr).openTxn(ctx, "AlexanderIII", 
HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 
TimeUnit.MILLISECONDS) * 2);
     Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 
TimeUnit.MILLISECONDS));
-    runReaper();
-    try {
-      txnMgr.rollbackTxn();
-    }
-    catch (LockException ex) {
-      exception = ex;
-    }
-    Assert.assertNotNull("Expected exception2", exception);
-    Assert.assertEquals("Wrong Exception2", ErrorMsg.TXN_ABORTED, 
exception.getCanonicalErrorMsg());
+    runReaper();//this will abort the txn
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
+    assertEquals(2, txnsInfo.getTxn_high_water_mark());
+    assertEquals(2, txnsInfo.getOpen_txns().size());
+    Assert.assertEquals(TxnState.ABORTED, 
txnsInfo.getOpen_txns().get(1).getState());
+    txnMgr.rollbackTxn();//this is idempotent
   }
 
   @Test

Reply via email to