HBASE-9899 for idempotent operation dups, return the result instead of throwing 
conflict exception (Guanghao Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/975f0dd9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/975f0dd9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/975f0dd9

Branch: refs/heads/hbase-12439
Commit: 975f0dd958debcdd842a95f8e9f7458689414fbf
Parents: 550b937
Author: stack <st...@apache.org>
Authored: Thu Aug 4 12:40:19 2016 -0700
Committer: stack <st...@apache.org>
Committed: Thu Aug 4 12:40:19 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/protobuf/ProtobufUtil.java     | 57 ++++++++++++++++++++
 .../hadoop/hbase/regionserver/HRegion.java      | 53 ++++++++++++++----
 .../hbase/regionserver/RSRpcServices.java       | 55 +++++++++++--------
 .../hadoop/hbase/regionserver/Region.java       | 11 ++++
 .../hbase/regionserver/ServerNonceManager.java  | 43 +++++++++++++++
 .../hbase/client/HConnectionTestingUtility.java | 44 +++++++++++++++
 .../hadoop/hbase/client/TestFromClientSide.java | 45 ++++++++++++++++
 .../client/TestIncrementsFromClientSide.java    | 48 +++++++++++++++++
 .../hadoop/hbase/client/TestMultiParallel.java  | 14 +++--
 .../TestScannerHeartbeatMessages.java           |  2 +-
 10 files changed, 332 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/975f0dd9/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index b3bf041..5ba0572 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -859,6 +859,63 @@ public final class ProtobufUtil {
   }
 
   /**
+   * Convert a protocol buffer Mutate to a Get.
+   * @param proto the protocol buffer Mutate to convert.
+   * @param cellScanner
+   * @return the converted client get.
+   * @throws IOException
+   */
+  public static Get toGet(final MutationProto proto, final CellScanner 
cellScanner)
+      throws IOException {
+    MutationType type = proto.getMutateType();
+    assert type == MutationType.INCREMENT || type == MutationType.APPEND : 
type.name();
+    byte[] row = proto.hasRow() ? proto.getRow().toByteArray() : null;
+    Get get = null;
+    int cellCount = proto.hasAssociatedCellCount() ? 
proto.getAssociatedCellCount() : 0;
+    if (cellCount > 0) {
+      // The proto has metadata only and the data is separate to be found in 
the cellScanner.
+      if (cellScanner == null) {
+        throw new DoNotRetryIOException("Cell count of " + cellCount + " but 
no cellScanner: "
+            + TextFormat.shortDebugString(proto));
+      }
+      for (int i = 0; i < cellCount; i++) {
+        if (!cellScanner.advance()) {
+          throw new DoNotRetryIOException("Cell count of " + cellCount + " but 
at index " + i
+              + " no cell returned: " + TextFormat.shortDebugString(proto));
+        }
+        Cell cell = cellScanner.current();
+        if (get == null) {
+          get = new Get(Bytes.copy(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength()));
+        }
+        get.addColumn(
+          Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength()),
+          Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(),
+            cell.getQualifierLength()));
+      }
+    } else {
+      get = new Get(row);
+      for (ColumnValue column : proto.getColumnValueList()) {
+        byte[] family = column.getFamily().toByteArray();
+        for (QualifierValue qv : column.getQualifierValueList()) {
+          byte[] qualifier = qv.getQualifier().toByteArray();
+          if (!qv.hasValue()) {
+            throw new DoNotRetryIOException("Missing required field: qualifier 
value");
+          }
+          get.addColumn(family, qualifier);
+        }
+      }
+    }
+    if (proto.hasTimeRange()) {
+      TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
+      get.setTimeRange(timeRange.getMin(), timeRange.getMax());
+    }
+    for (NameBytesPair attribute : proto.getAttributeList()) {
+      get.setAttribute(attribute.getName(), 
attribute.getValue().toByteArray());
+    }
+    return get;
+  }
+
+  /**
    * Convert a client Scan to a protocol buffer Scan
    *
    * @param scan the client Scan to convert

http://git-wip-us.apache.org/repos/asf/hbase/blob/975f0dd9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b7950df..86c02ea 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2617,6 +2617,11 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   @Override
   public RegionScanner getScanner(Scan scan, List<KeyValueScanner> 
additionalScanners)
       throws IOException {
+    return getScanner(scan, additionalScanners, HConstants.NO_NONCE, 
HConstants.NO_NONCE);
+  }
+
+  private RegionScanner getScanner(Scan scan, List<KeyValueScanner> 
additionalScanners,
+      long nonceGroup, long nonce) throws IOException {
     startRegionOperation(Operation.SCAN);
     try {
       // Verify families are all valid
@@ -2630,7 +2635,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           checkFamily(family);
         }
       }
-      return instantiateRegionScanner(scan, additionalScanners);
+      return instantiateRegionScanner(scan, additionalScanners, nonceGroup, 
nonce);
     } finally {
       closeRegionOperation(Operation.SCAN);
     }
@@ -2638,13 +2643,19 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
 
   protected RegionScanner instantiateRegionScanner(Scan scan,
       List<KeyValueScanner> additionalScanners) throws IOException {
+    return instantiateRegionScanner(scan, additionalScanners, 
HConstants.NO_NONCE,
+      HConstants.NO_NONCE);
+  }
+
+  protected RegionScanner instantiateRegionScanner(Scan scan,
+      List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) 
throws IOException {
     if (scan.isReversed()) {
       if (scan.getFilter() != null) {
         scan.getFilter().setReversed(true);
       }
       return new ReversedRegionScannerImpl(scan, additionalScanners, this);
     }
-    return new RegionScannerImpl(scan, additionalScanners, this);
+    return new RegionScannerImpl(scan, additionalScanners, this, nonceGroup, 
nonce);
   }
 
   @Override
@@ -5592,6 +5603,11 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
 
     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, 
HRegion region)
         throws IOException {
+      this(scan, additionalScanners, region, HConstants.NO_NONCE, 
HConstants.NO_NONCE);
+    }
+
+    RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, 
HRegion region,
+        long nonceGroup, long nonce) throws IOException {
       this.region = region;
       this.maxResultSize = scan.getMaxResultSize();
       if (scan.hasFilter()) {
@@ -5621,15 +5637,25 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       // getSmallestReadPoint, before scannerReadPoints is updated.
       IsolationLevel isolationLevel = scan.getIsolationLevel();
       synchronized(scannerReadPoints) {
-        this.readPt = getReadPoint(isolationLevel);
+        if (nonce == HConstants.NO_NONCE || rsServices == null
+            || rsServices.getNonceManager() == null) {
+          this.readPt = getReadPoint(isolationLevel);
+        } else {
+          this.readPt = 
rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
+        }
         scannerReadPoints.put(this, this.readPt);
       }
 
+      initializeScanners(scan, additionalScanners);
+    }
+
+    protected void initializeScanners(Scan scan, List<KeyValueScanner> 
additionalScanners)
+        throws IOException {
       // Here we separate all scanners into two lists - scanner that provide 
data required
       // by the filter to operate (scanners list) and all others 
(joinedScanners list).
       List<KeyValueScanner> scanners = new 
ArrayList<KeyValueScanner>(scan.getFamilyMap().size());
-      List<KeyValueScanner> joinedScanners
-        = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());
+      List<KeyValueScanner> joinedScanners =
+          new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());
       // Store all already instantiated scanners for exception handling
       List<KeyValueScanner> instantiatedScanners = new 
ArrayList<KeyValueScanner>();
       // handle additionalScanners
@@ -6795,15 +6821,15 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     return Result.create(results, get.isCheckExistenceOnly() ? 
!results.isEmpty() : null, stale);
   }
 
-   void prepareGet(final Get get) throws IOException, 
NoSuchColumnFamilyException {
+  void prepareGet(final Get get) throws IOException, 
NoSuchColumnFamilyException {
     checkRow(get.getRow(), "Get");
     // Verify families are all valid
     if (get.hasFamilies()) {
-      for (byte [] family: get.familySet()) {
+      for (byte[] family : get.familySet()) {
         checkFamily(family);
       }
     } else { // Adding all families to scanner
-      for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
+      for (byte[] family : this.htableDescriptor.getFamiliesKeys()) {
         get.addFamily(family);
       }
     }
@@ -6811,7 +6837,12 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
 
   @Override
   public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
+    return get(get, withCoprocessor, HConstants.NO_NONCE, HConstants.NO_NONCE);
+  }
 
+  @Override
+  public List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, 
long nonce)
+      throws IOException {
     List<Cell> results = new ArrayList<Cell>();
 
     // pre-get CP hook
@@ -6825,7 +6856,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
 
     RegionScanner scanner = null;
     try {
-      scanner = getScanner(scan);
+      scanner = getScanner(scan, null, nonceGroup, nonce);
       scanner.next(results);
     } finally {
       if (scanner != null)
@@ -7168,6 +7199,10 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
               applyToMemstore(e.getKey(), e.getValue(), true, false, 
sequenceId);
         }
         mvcc.completeAndWait(writeEntry);
+        if (rsServices != null && rsServices.getNonceManager() != null) {
+          rsServices.getNonceManager().addMvccToOperationContext(nonceGroup, 
nonce,
+            writeEntry.getWriteNumber());
+        }
         writeEntry = null;
       } finally {
         this.updatesLock.readLock().unlock();

http://git-wip-us.apache.org/repos/asf/hbase/blob/975f0dd9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 9cfc5df..f9b78e1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -73,7 +73,6 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.exceptions.MergeRegionException;
-import org.apache.hadoop.hbase.exceptions.OperationConflictException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -426,11 +425,11 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
    * Starts the nonce operation for a mutation, if needed.
    * @param mutation Mutation.
    * @param nonceGroup Nonce group from the request.
-   * @returns Nonce used (can be NO_NONCE).
+   * @returns whether to proceed this mutation.
    */
-  private long startNonceOperation(final MutationProto mutation, long 
nonceGroup)
-      throws IOException, OperationConflictException {
-    if (regionServer.nonceManager == null || !mutation.hasNonce()) return 
HConstants.NO_NONCE;
+  private boolean startNonceOperation(final MutationProto mutation, long 
nonceGroup)
+      throws IOException {
+    if (regionServer.nonceManager == null || !mutation.hasNonce()) return true;
     boolean canProceed = false;
     try {
       canProceed = regionServer.nonceManager.startOperation(
@@ -438,14 +437,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     } catch (InterruptedException ex) {
       throw new InterruptedIOException("Nonce start operation interrupted");
     }
-    if (!canProceed) {
-      // TODO: instead, we could convert append/increment to get w/mvcc
-      String message = "The operation with nonce {" + nonceGroup + ", " + 
mutation.getNonce()
-        + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
-        + "] may have already completed";
-      throw new OperationConflictException(message);
-    }
-    return mutation.getNonce();
+    return canProceed;
   }
 
   /**
@@ -614,23 +606,33 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
    * bypassed as indicated by RegionObserver, null otherwise
    * @throws IOException
    */
-  private Result append(final Region region, final OperationQuota quota, final 
MutationProto m,
-      final CellScanner cellScanner, long nonceGroup) throws IOException {
+  private Result append(final Region region, final OperationQuota quota,
+      final MutationProto mutation, final CellScanner cellScanner, long 
nonceGroup)
+      throws IOException {
     long before = EnvironmentEdgeManager.currentTime();
-    Append append = ProtobufUtil.toAppend(m, cellScanner);
+    Append append = ProtobufUtil.toAppend(mutation, cellScanner);
     quota.addMutation(append);
     Result r = null;
     if (region.getCoprocessorHost() != null) {
       r = region.getCoprocessorHost().preAppend(append);
     }
     if (r == null) {
-      long nonce = startNonceOperation(m, nonceGroup);
+      boolean canProceed = startNonceOperation(mutation, nonceGroup);
       boolean success = false;
       try {
-        r = region.append(append, nonceGroup, nonce);
+        if (canProceed) {
+          r = region.append(append, nonceGroup, mutation.getNonce());
+        } else {
+          // convert duplicate append to get
+          List<Cell> results = region.get(ProtobufUtil.toGet(mutation, 
cellScanner), false,
+            nonceGroup, mutation.getNonce());
+          r = Result.create(results);
+        }
         success = true;
       } finally {
-        endNonceOperation(m, nonceGroup, success);
+        if (canProceed) {
+          endNonceOperation(mutation, nonceGroup, success);
+        }
       }
       if (region.getCoprocessorHost() != null) {
         region.getCoprocessorHost().postAppend(append, r);
@@ -662,13 +664,22 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
       r = region.getCoprocessorHost().preIncrement(increment);
     }
     if (r == null) {
-      long nonce = startNonceOperation(mutation, nonceGroup);
+      boolean canProceed = startNonceOperation(mutation, nonceGroup);
       boolean success = false;
       try {
-        r = region.increment(increment, nonceGroup, nonce);
+        if (canProceed) {
+          r = region.increment(increment, nonceGroup, mutation.getNonce());
+        } else {
+          // convert duplicate increment to get
+          List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), 
false, nonceGroup,
+            mutation.getNonce());
+          r = Result.create(results);
+        }
         success = true;
       } finally {
-        endNonceOperation(mutation, nonceGroup, success);
+        if (canProceed) {
+          endNonceOperation(mutation, nonceGroup, success);
+        }
       }
       if (region.getCoprocessorHost() != null) {
         r = region.getCoprocessorHost().postIncrement(increment, r);

http://git-wip-us.apache.org/repos/asf/hbase/blob/975f0dd9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 9b1f82a..efd68b8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -395,6 +395,17 @@ public interface Region extends ConfigurationObserver {
   List<Cell> get(Get get, boolean withCoprocessor) throws IOException;
 
   /**
+   * Do a get for duplicate non-idempotent operation.
+   * @param get query parameters.
+   * @param withCoprocessor
+   * @param nonceGroup Nonce group.
+   * @param nonce Nonce.
+   * @return list of cells resulting from the operation
+   * @throws IOException
+   */
+  List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long 
nonce) throws IOException;
+
+  /**
    * Return an iterator that scans over the HRegion, returning the indicated
    * columns and rows specified by the {@link Scan}.
    * <p>

http://git-wip-us.apache.org/repos/asf/hbase/blob/975f0dd9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java
index b2b656b..459b69a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java
@@ -62,6 +62,8 @@ public class ServerNonceManager {
     private static final long WAITING_BIT = 4;
     private static final long ALL_FLAG_BITS = WAITING_BIT | STATE_BITS;
 
+    private static long mvcc;
+
     @Override
     public String toString() {
       return "[state " + getState() + ", hasWait " + hasWait() + ", activity "
@@ -98,6 +100,14 @@ public class ServerNonceManager {
       return getActivityTime() < (minRelevantTime & (~0l >>> 3));
     }
 
+    public void setMvcc(long mvcc) {
+      this.mvcc = mvcc;
+    }
+
+    public long getMvcc() {
+      return this.mvcc;
+    }
+
     private long getActivityTime() {
       return this.data >>> 3;
     }
@@ -192,6 +202,39 @@ public class ServerNonceManager {
   }
 
   /**
+   * Store the write point in OperationContext when the operation succeed.
+   * @param group Nonce group.
+   * @param nonce Nonce.
+   * @param mvcc Write point of the succeed operation.
+   */
+  public void addMvccToOperationContext(long group, long nonce, long mvcc) {
+    if (nonce == HConstants.NO_NONCE) {
+      return;
+    }
+    NonceKey nk = new NonceKey(group, nonce);
+    OperationContext result = nonces.get(nk);
+    assert result != null;
+    synchronized (result) {
+      result.setMvcc(mvcc);
+    }
+  }
+
+  /**
+   * Return the write point of the previous succeed operation.
+   * @param group Nonce group.
+   * @param nonce Nonce.
+   * @return write point of the previous succeed operation.
+   */
+  public long getMvccFromOperationContext(long group, long nonce) {
+    if (nonce == HConstants.NO_NONCE) {
+      return Long.MAX_VALUE;
+    }
+    NonceKey nk = new NonceKey(group, nonce);
+    OperationContext result = nonces.get(nk);
+    return result == null ? Long.MAX_VALUE : result.getMvcc();
+  }
+
+  /**
    * Reports the operation from WAL during replay.
    * @param group Nonce group.
    * @param nonce Nonce.

http://git-wip-us.apache.org/repos/asf/hbase/blob/975f0dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index c8ccd2a..265e3c1 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.RegionLocations;
@@ -28,6 +29,10 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.mockito.Mockito;
 
@@ -150,4 +155,43 @@ public class HConnectionTestingUtility {
       Mockito.spy(new ConnectionImplementation(conf, null, null));
     return connection;
   }
+
+  /**
+   * This coproceesor sleep 2s at first increment/append rpc call.
+   */
+  public static class SleepAtFirstRpcCall extends BaseRegionObserver {
+    static final AtomicLong ct = new AtomicLong(0);
+    static final String SLEEP_TIME_CONF_KEY =
+        "hbase.coprocessor.SleepAtFirstRpcCall.sleepTime";
+    static final long DEFAULT_SLEEP_TIME = 2000;
+    static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME);
+
+    public SleepAtFirstRpcCall() {
+    }
+
+    @Override
+    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
+      RegionCoprocessorEnvironment env = c.getEnvironment();
+      Configuration conf = env.getConfiguration();
+      sleepTime.set(conf.getLong(SLEEP_TIME_CONF_KEY, DEFAULT_SLEEP_TIME));
+    }
+
+    @Override
+    public Result postIncrement(final 
ObserverContext<RegionCoprocessorEnvironment> e,
+        final Increment increment, final Result result) throws IOException {
+      if (ct.incrementAndGet() == 1) {
+        Threads.sleep(sleepTime.get());
+      }
+      return result;
+    }
+
+    @Override
+    public Result postAppend(final 
ObserverContext<RegionCoprocessorEnvironment> e,
+        final Append append, final Result result) throws IOException {
+      if (ct.incrementAndGet() == 1) {
+        Threads.sleep(sleepTime.get());
+      }
+      return result;
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/975f0dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index ca4b609..bc94b02 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -169,6 +170,50 @@ public class TestFromClientSide {
   }
 
   /**
+   * Test append result when there are duplicate rpc request.
+   */
+  @Test
+  public void testDuplicateAppend() throws Exception {
+    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testDuplicateAppend");
+    Map<String, String> kvs = new HashMap<String, String>();
+    kvs.put(HConnectionTestingUtility.SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, 
"2000");
+    
hdt.addCoprocessor(HConnectionTestingUtility.SleepAtFirstRpcCall.class.getName(),
 null, 1, kvs);
+    TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close();
+
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
+    // Client will retry beacuse rpc timeout is small than the sleep time of 
first rpc call
+    c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500);
+
+    Connection connection = ConnectionFactory.createConnection(c);
+    Table t = 
connection.getTable(TableName.valueOf("HCM-testDuplicateAppend"));
+    if (t instanceof HTable) {
+      HTable table = (HTable) t;
+      table.setOperationTimeout(3 * 1000);
+
+      try {
+        Append append = new Append(ROW);
+        append.add(TEST_UTIL.fam1, QUALIFIER, VALUE);
+        Result result = table.append(append);
+
+        // Verify expected result
+        Cell[] cells = result.rawCells();
+        assertEquals(1, cells.length);
+        assertKey(cells[0], ROW, TEST_UTIL.fam1, QUALIFIER, VALUE);
+
+        // Verify expected result again
+        Result readResult = table.get(new Get(ROW));
+        cells = readResult.rawCells();
+        assertEquals(1, cells.length);
+        assertKey(cells[0], ROW, TEST_UTIL.fam1, QUALIFIER, VALUE);
+      } finally {
+        table.close();
+        connection.close();
+      }
+    }
+  }
+
+  /**
    * Basic client side validation of HBASE-4536
    */
    @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/975f0dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
index 6b4ee89..3ddfef4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,6 +31,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
@@ -56,6 +60,7 @@ public class TestIncrementsFromClientSide {
   protected final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
   private static byte [] ROW = Bytes.toBytes("testRow");
   private static byte [] FAMILY = Bytes.toBytes("testFamily");
+  private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
   // This test depends on there being only one slave running at at a time. See 
the @Before
   // method where we do rolling restart.
   protected static int SLAVES = 1;
@@ -79,6 +84,49 @@ public class TestIncrementsFromClientSide {
     TEST_UTIL.shutdownMiniCluster();
   }
 
+  /**
+   * Test increment result when there are duplicate rpc request.
+   */
+  @Test
+  public void testDuplicateIncrement() throws Exception {
+    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testDuplicateIncrement");
+    Map<String, String> kvs = new HashMap<String, String>();
+    kvs.put(HConnectionTestingUtility.SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, 
"2000");
+    
hdt.addCoprocessor(HConnectionTestingUtility.SleepAtFirstRpcCall.class.getName(),
 null, 1, kvs);
+    TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close();
+
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
+    // Client will retry beacuse rpc timeout is small than the sleep time of 
first rpc call
+    c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500);
+
+    Connection connection = ConnectionFactory.createConnection(c);
+    Table t = 
connection.getTable(TableName.valueOf("HCM-testDuplicateIncrement"));
+    if (t instanceof HTable) {
+      HTable table = (HTable) t;
+      table.setOperationTimeout(3 * 1000);
+
+      try {
+        Increment inc = new Increment(ROW);
+        inc.addColumn(TEST_UTIL.fam1, QUALIFIER, 1);
+        Result result = table.increment(inc);
+
+        Cell [] cells = result.rawCells();
+        assertEquals(1, cells.length);
+        assertIncrementKey(cells[0], ROW, TEST_UTIL.fam1, QUALIFIER, 1);
+
+        // Verify expected result
+        Result readResult = table.get(new Get(ROW));
+        cells = readResult.rawCells();
+        assertEquals(1, cells.length);
+        assertIncrementKey(cells[0], ROW, TEST_UTIL.fam1, QUALIFIER, 1);
+      } finally {
+        table.close();
+        connection.close();
+      }
+    }
+  }
+
   @Test
   public void testIncrementWithDeletes() throws Exception {
     LOG.info("Starting " + this.name.getMethodName());

http://git-wip-us.apache.org/repos/asf/hbase/blob/975f0dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
index 67ac51e..b1ad172 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.codec.KeyValueCodec;
-import org.apache.hadoop.hbase.exceptions.OperationConflictException;
 import org.apache.hadoop.hbase.testclassification.FlakeyTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -524,16 +523,16 @@ public class TestMultiParallel {
       Increment inc = new Increment(ONE_ROW);
       inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
       table.increment(inc);
+
+      // duplicate increment
       inc = new Increment(ONE_ROW);
       inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
-      try {
-        table.increment(inc);
-        fail("Should have thrown an exception");
-      } catch (OperationConflictException ex) {
-      }
+      Result result = table.increment(inc);
+      validateResult(result, QUALIFIER, Bytes.toBytes(1L));
+
       Get get = new Get(ONE_ROW);
       get.addColumn(BYTES_FAMILY, QUALIFIER);
-      Result result = table.get(get);
+      result = table.get(get);
       validateResult(result, QUALIFIER, Bytes.toBytes(1L));
 
       // Now run a bunch of requests in parallel, exactly half should succeed.
@@ -561,7 +560,6 @@ public class TestMultiParallel {
             }
             try {
               table.increment(inc);
-            } catch (OperationConflictException ex) { // Some threads are 
expected to fail.
             } catch (IOException ioEx) {
               fail("Not expected");
             }

http://git-wip-us.apache.org/repos/asf/hbase/blob/975f0dd9/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
index 54bee94..b906e84 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
@@ -489,7 +489,7 @@ public class TestScannerHeartbeatMessages {
     // Instantiate the custom heartbeat region scanners
     @Override
     protected RegionScanner instantiateRegionScanner(Scan scan,
-        List<KeyValueScanner> additionalScanners) throws IOException {
+        List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) 
throws IOException {
       if (scan.isReversed()) {
         if (scan.getFilter() != null) {
           scan.getFilter().setReversed(true);

Reply via email to