http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index fbd9f51..1b3e111 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -18,12 +18,6 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
@@ -43,7 +37,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -74,6 +67,16 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.util.Threads;
 
+import com.google.common.annotations.VisibleForTesting;
+
+// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS 
ONLY.
+// Internally, we use shaded protobuf. This below are part of our public API.
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.Service;
+// SEE ABOVE NOTE!
+
 /**
  * An implementation of {@link Table}. Used to communicate with a single HBase 
table.
  * Lightweight. Get as needed and just close when done.
@@ -411,23 +414,16 @@ public class HTable implements Table {
 
     if (get.getConsistency() == Consistency.STRONG) {
       // Good old call.
-      final Get getReq = get;
+      final Get configuredGet = get;
       RegionServerCallable<Result> callable = new 
RegionServerCallable<Result>(this.connection,
-          getName(), get.getRow()) {
+          this.rpcControllerFactory, getName(), get.getRow()) {
         @Override
-        public Result call(int callTimeout) throws IOException {
-          ClientProtos.GetRequest request =
-            
RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), 
getReq);
-          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            ClientProtos.GetResponse response = getStub().get(controller, 
request);
-            if (response == null) return null;
-            return ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
+        protected Result call(PayloadCarryingRpcController controller) throws 
Exception {
+          ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
+              getLocation().getRegionInfo().getRegionName(), configuredGet);
+          ClientProtos.GetResponse response = getStub().get(controller, 
request);
+          if (response == null) return null;
+          return ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
         }
       };
       return 
rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
@@ -443,7 +439,6 @@ public class HTable implements Table {
     return callable.call(operationTimeout);
   }
 
-
   /**
    * {@inheritDoc}
    */
@@ -454,16 +449,14 @@ public class HTable implements Table {
     }
     try {
       Object[] r1 = new Object[gets.size()];
-      batch((List) gets, r1);
-
-      // translate.
+      batch((List<? extends Row>)gets, r1);
+      // Translate.
       Result [] results = new Result[r1.length];
-      int i=0;
-      for (Object o : r1) {
-        // batch ensures if there is a failure we get an exception instead
-        results[i++] = (Result) o;
+      int i = 0;
+      for (Object obj: r1) {
+        // Batch ensures if there is a failure we get an exception instead
+        results[i++] = (Result)obj;
       }
-
       return results;
     } catch (InterruptedException e) {
       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
@@ -511,21 +504,13 @@ public class HTable implements Table {
   public void delete(final Delete delete)
   throws IOException {
     RegionServerCallable<Boolean> callable = new 
RegionServerCallable<Boolean>(connection,
-        tableName, delete.getRow()) {
+        this.rpcControllerFactory, getName(), delete.getRow()) {
       @Override
-      public Boolean call(int callTimeout) throws IOException {
-        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
-        controller.setPriority(tableName);
-        controller.setCallTimeout(callTimeout);
-
-        try {
-          MutateRequest request = RequestConverter.buildMutateRequest(
-            getLocation().getRegionInfo().getRegionName(), delete);
-          MutateResponse response = getStub().mutate(controller, request);
-          return Boolean.valueOf(response.getProcessed());
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
-        }
+      protected Boolean call(PayloadCarryingRpcController controller) throws 
Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), delete);
+        MutateResponse response = getStub().mutate(controller, request);
+        return Boolean.valueOf(response.getProcessed());
       }
     };
     rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
@@ -581,41 +566,28 @@ public class HTable implements Table {
    */
   @Override
   public void mutateRow(final RowMutations rm) throws IOException {
-    final RetryingTimeTracker tracker = new RetryingTimeTracker();
     PayloadCarryingServerCallable<MultiResponse> callable =
-      new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), 
rm.getRow(),
+      new PayloadCarryingServerCallable<MultiResponse>(this.connection, 
getName(), rm.getRow(),
           rpcControllerFactory) {
-        @Override
-        public MultiResponse call(int callTimeout) throws IOException {
-          tracker.start();
-          controller.setPriority(tableName);
-          int remainingTime = tracker.getRemainingTime(callTimeout);
-          if (remainingTime == 0) {
-            throw new DoNotRetryIOException("Timeout for mutate row");
-          }
-          controller.setCallTimeout(remainingTime);
-          try {
-            RegionAction.Builder regionMutationBuilder = 
RequestConverter.buildRegionAction(
-                getLocation().getRegionInfo().getRegionName(), rm);
-            regionMutationBuilder.setAtomic(true);
-            MultiRequest request =
-                
MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
-            ClientProtos.MultiResponse response = getStub().multi(controller, 
request);
-            ClientProtos.RegionActionResult res = 
response.getRegionActionResultList().get(0);
-            if (res.hasException()) {
-              Throwable ex = ProtobufUtil.toException(res.getException());
-              if (ex instanceof IOException) {
-                throw (IOException) ex;
-              }
-              throw new IOException("Failed to mutate row: " +
-                  Bytes.toStringBinary(rm.getRow()), ex);
-            }
-            return ResponseConverter.getResults(request, response, 
controller.cellScanner());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
+      @Override
+      protected MultiResponse call(PayloadCarryingRpcController controller) 
throws Exception {
+        RegionAction.Builder regionMutationBuilder = 
RequestConverter.buildRegionAction(
+            getLocation().getRegionInfo().getRegionName(), rm);
+        regionMutationBuilder.setAtomic(true);
+        MultiRequest request =
+            
MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
+        ClientProtos.MultiResponse response = getStub().multi(controller, 
request);
+        ClientProtos.RegionActionResult res = 
response.getRegionActionResultList().get(0);
+        if (res.hasException()) {
+          Throwable ex = ProtobufUtil.toException(res.getException());
+          if (ex instanceof IOException) {
+            throw (IOException) ex;
           }
+          throw new IOException("Failed to mutate row: " + 
Bytes.toStringBinary(rm.getRow()), ex);
         }
-      };
+        return ResponseConverter.getResults(request, response, 
controller.cellScanner());
+      }
+    };
     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, 
rm.getMutations(),
         null, null, callable, operationTimeout);
     ars.waitUntilDone();
@@ -624,38 +596,31 @@ public class HTable implements Table {
     }
   }
 
+  private static void checkHasFamilies(final Mutation mutation) throws 
IOException {
+    if (mutation.numFamilies() == 0) {
+      throw new IOException("Invalid arguments to " + mutation + ", zero 
columns specified");
+    }
+  }
+
   /**
    * {@inheritDoc}
    */
   @Override
   public Result append(final Append append) throws IOException {
-    if (append.numFamilies() == 0) {
-      throw new IOException(
-          "Invalid arguments to append, no columns specified");
-    }
-
-    NonceGenerator ng = this.connection.getNonceGenerator();
-    final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
-    RegionServerCallable<Result> callable =
-      new RegionServerCallable<Result>(this.connection, getName(), 
append.getRow()) {
-        @Override
-        public Result call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
-          controller.setPriority(getTableName());
-          controller.setCallTimeout(callTimeout);
-          try {
-            MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), append, 
nonceGroup, nonce);
-            MutateResponse response = getStub().mutate(controller, request);
-            if (!response.hasResult()) return null;
-            return ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Result> 
newCaller(rpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+    checkHasFamilies(append);
+    RegionServerCallable<Result> callable = new 
RegionServerCallable<Result>(this.connection,
+        this.rpcControllerFactory, getName(), append.getRow()) {
+      @Override
+      protected Result call(PayloadCarryingRpcController controller) throws 
Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), append, 
getNonceGroup(), getNewNonce());
+        MutateResponse response = getStub().mutate(controller, request);
+        if (!response.hasResult()) return null;
+        return ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
+      }
+    };
+    return rpcCallerFactory.<Result> newCaller(this.rpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -663,27 +628,16 @@ public class HTable implements Table {
    */
   @Override
   public Result increment(final Increment increment) throws IOException {
-    if (!increment.hasFamilies()) {
-      throw new IOException(
-          "Invalid arguments to increment, no columns specified");
-    }
-    NonceGenerator ng = this.connection.getNonceGenerator();
-    final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
+    checkHasFamilies(increment);
     RegionServerCallable<Result> callable = new 
RegionServerCallable<Result>(this.connection,
-        getName(), increment.getRow()) {
+        this.rpcControllerFactory, getName(), increment.getRow()) {
       @Override
-      public Result call(int callTimeout) throws IOException {
-        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
-        controller.setPriority(getTableName());
-        controller.setCallTimeout(callTimeout);
-        try {
-          MutateRequest request = RequestConverter.buildMutateRequest(
-            getLocation().getRegionInfo().getRegionName(), increment, 
nonceGroup, nonce);
-          MutateResponse response = getStub().mutate(controller, request);
-          return ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
-        }
+      protected Result call(PayloadCarryingRpcController controller) throws 
Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), increment, 
getNonceGroup(), getNewNonce());
+        MutateResponse response = getStub().mutate(controller, request);
+        // Should this check for null like append does?
+        return ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
       }
     };
     return rpcCallerFactory.<Result> 
newCaller(rpcTimeout).callWithRetries(callable,
@@ -722,28 +676,20 @@ public class HTable implements Table {
 
     NonceGenerator ng = this.connection.getNonceGenerator();
     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
-    RegionServerCallable<Long> callable =
-      new RegionServerCallable<Long>(connection, getName(), row) {
-        @Override
-        public Long call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
-          controller.setPriority(getTableName());
-          controller.setCallTimeout(callTimeout);
-          try {
-            MutateRequest request = RequestConverter.buildIncrementRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family,
-              qualifier, amount, durability, nonceGroup, nonce);
-            MutateResponse response = getStub().mutate(controller, request);
-            Result result =
-              ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
-            return Long.valueOf(Bytes.toLong(result.getValue(family, 
qualifier)));
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Long> 
newCaller(rpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+    RegionServerCallable<Long> callable = new 
RegionServerCallable<Long>(this.connection,
+        this.rpcControllerFactory, getName(), row) {
+      @Override
+      protected Long call(PayloadCarryingRpcController controller) throws 
Exception {
+        MutateRequest request = RequestConverter.buildIncrementRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family,
+          qualifier, amount, durability, nonceGroup, nonce);
+        MutateResponse response = getStub().mutate(controller, request);
+        Result result = ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
+        return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
+      }
+    };
+    return rpcCallerFactory.<Long> newCaller(rpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -754,26 +700,19 @@ public class HTable implements Table {
       final byte [] family, final byte [] qualifier, final byte [] value,
       final Put put)
   throws IOException {
-    RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
-        @Override
-        public Boolean call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            MutateRequest request = RequestConverter.buildMutateRequest(
-                getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
-                new BinaryComparator(value), CompareType.EQUAL, put);
-            MutateResponse response = getStub().mutate(controller, request);
-            return Boolean.valueOf(response.getProcessed());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Boolean> 
newCaller(rpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+    RegionServerCallable<Boolean> callable = new 
RegionServerCallable<Boolean>(this.connection,
+        this.rpcControllerFactory, getName(), row) {
+      @Override
+      protected Boolean call(PayloadCarryingRpcController controller) throws 
Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
+          new BinaryComparator(value), CompareType.EQUAL, put);
+        MutateResponse response = getStub().mutate(controller, request);
+        return Boolean.valueOf(response.getProcessed());
+      }
+    };
+    return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -784,57 +723,42 @@ public class HTable implements Table {
       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
       final Put put)
   throws IOException {
-    RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
-        @Override
-        public Boolean call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = new 
PayloadCarryingRpcController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            CompareType compareType = CompareType.valueOf(compareOp.name());
-            MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
-                new BinaryComparator(value), compareType, put);
-            MutateResponse response = getStub().mutate(controller, request);
-            return Boolean.valueOf(response.getProcessed());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Boolean> 
newCaller(rpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+    RegionServerCallable<Boolean> callable = new 
RegionServerCallable<Boolean>(this.connection,
+        this.rpcControllerFactory, getName(), row) {
+      @Override
+      protected Boolean call(PayloadCarryingRpcController controller) throws 
Exception {
+        CompareType compareType = CompareType.valueOf(compareOp.name());
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
+          new BinaryComparator(value), compareType, put);
+        MutateResponse response = getStub().mutate(controller, request);
+        return Boolean.valueOf(response.getProcessed());
+      }
+    };
+    return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public boolean checkAndDelete(final byte [] row,
-      final byte [] family, final byte [] qualifier, final byte [] value,
-      final Delete delete)
+  public boolean checkAndDelete(final byte [] row, final byte [] family, final 
byte [] qualifier,
+      final byte [] value, final Delete delete)
   throws IOException {
-    RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
-        @Override
-        public Boolean call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
-                new BinaryComparator(value), CompareType.EQUAL, delete);
-            MutateResponse response = getStub().mutate(controller, request);
-            return Boolean.valueOf(response.getProcessed());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Boolean> 
newCaller(rpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+    RegionServerCallable<Boolean> callable = new 
RegionServerCallable<Boolean>(this.connection,
+        this.rpcControllerFactory, getName(), row) {
+      @Override
+      protected Boolean call(PayloadCarryingRpcController controller) throws 
Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
+          new BinaryComparator(value), CompareType.EQUAL, delete);
+        MutateResponse response = getStub().mutate(controller, request);
+        return Boolean.valueOf(response.getProcessed());
+      }
+    };
+    return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -845,25 +769,18 @@ public class HTable implements Table {
       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
       final Delete delete)
   throws IOException {
-    RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
-        @Override
-        public Boolean call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            CompareType compareType = CompareType.valueOf(compareOp.name());
-            MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
-                new BinaryComparator(value), compareType, delete);
-            MutateResponse response = getStub().mutate(controller, request);
-            return Boolean.valueOf(response.getProcessed());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
+    RegionServerCallable<Boolean> callable = new 
RegionServerCallable<Boolean>(this.connection,
+        this.rpcControllerFactory, getName(), row) {
+      @Override
+      protected Boolean call(PayloadCarryingRpcController controller) throws 
Exception {
+        CompareType compareType = CompareType.valueOf(compareOp.name());
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
+          new BinaryComparator(value), compareType, delete);
+        MutateResponse response = getStub().mutate(controller, request);
+        return Boolean.valueOf(response.getProcessed());
+      }
+    };
     return rpcCallerFactory.<Boolean> 
newCaller(rpcTimeout).callWithRetries(callable,
         this.operationTimeout);
   }
@@ -875,40 +792,28 @@ public class HTable implements Table {
   public boolean checkAndMutate(final byte [] row, final byte [] family, final 
byte [] qualifier,
     final CompareOp compareOp, final byte [] value, final RowMutations rm)
     throws IOException {
-    final RetryingTimeTracker tracker = new RetryingTimeTracker();
     PayloadCarryingServerCallable<MultiResponse> callable =
       new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), 
rm.getRow(),
         rpcControllerFactory) {
         @Override
-        public MultiResponse call(int callTimeout) throws IOException {
-          tracker.start();
-          controller.setPriority(tableName);
-          int remainingTime = tracker.getRemainingTime(callTimeout);
-          if (remainingTime == 0) {
-            throw new DoNotRetryIOException("Timeout for mutate row");
-          }
-          controller.setCallTimeout(remainingTime);
-          try {
-            CompareType compareType = CompareType.valueOf(compareOp.name());
-            MultiRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
-              new BinaryComparator(value), compareType, rm);
-            ClientProtos.MultiResponse response = getStub().multi(controller, 
request);
-            ClientProtos.RegionActionResult res = 
response.getRegionActionResultList().get(0);
-            if (res.hasException()) {
-              Throwable ex = ProtobufUtil.toException(res.getException());
-              if(ex instanceof IOException) {
-                throw (IOException)ex;
-              }
-              throw new IOException("Failed to checkAndMutate row: "+
-                                    Bytes.toStringBinary(rm.getRow()), ex);
+        protected MultiResponse call(PayloadCarryingRpcController controller) 
throws Exception {
+          CompareType compareType = CompareType.valueOf(compareOp.name());
+          MultiRequest request = RequestConverter.buildMutateRequest(
+            getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
+            new BinaryComparator(value), compareType, rm);
+          ClientProtos.MultiResponse response = getStub().multi(controller, 
request);
+          ClientProtos.RegionActionResult res = 
response.getRegionActionResultList().get(0);
+          if (res.hasException()) {
+            Throwable ex = ProtobufUtil.toException(res.getException());
+            if (ex instanceof IOException) {
+              throw (IOException)ex;
             }
-            return ResponseConverter.getResults(request, response, 
controller.cellScanner());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
+            throw new IOException("Failed to checkAndMutate row: "+ 
Bytes.toStringBinary(rm.getRow()), ex);
           }
+          return ResponseConverter.getResults(request, response, 
controller.cellScanner());
         }
       };
+
     /**
      *  Currently, we use one array to store 'processed' flag which is 
returned by server.
      *  It is excessive to send such a large array, but that is required by 
the framework right now
@@ -968,7 +873,6 @@ public class HTable implements Table {
   }
 
   /**
-   * {@inheritDoc}
    * @throws IOException
    */
   void flushCommits() throws IOException {
@@ -1145,19 +1049,18 @@ public class HTable implements Table {
     for (final byte[] r : keys) {
       final RegionCoprocessorRpcChannel channel =
           new RegionCoprocessorRpcChannel(connection, tableName, r);
-      Future<R> future = pool.submit(
-          new Callable<R>() {
-            @Override
-            public R call() throws Exception {
-              T instance = ProtobufUtil.newServiceStub(service, channel);
-              R result = callable.call(instance);
-              byte[] region = channel.getLastRegion();
-              if (callback != null) {
-                callback.update(region, r, result);
-              }
-              return result;
-            }
-          });
+      Future<R> future = pool.submit(new Callable<R>() {
+        @Override
+        public R call() throws Exception {
+          T instance = ProtobufUtil.newServiceStub(service, channel);
+          R result = callable.call(instance);
+          byte[] region = channel.getLastRegion();
+          if (callback != null) {
+            callback.update(region, r, result);
+          }
+          return result;
+        }
+      });
       futures.put(r, future);
     }
     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
@@ -1210,9 +1113,6 @@ public class HTable implements Table {
     return tableName + ";" + connection;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public <R extends Message> Map<byte[], R> batchCoprocessorService(
       Descriptors.MethodDescriptor methodDescriptor, Message request,
@@ -1221,14 +1121,13 @@ public class HTable implements Table {
         Bytes.BYTES_COMPARATOR));
     batchCoprocessorService(methodDescriptor, request, startKey, endKey, 
responsePrototype,
         new Callback<R>() {
-
-          @Override
-          public void update(byte[] region, byte[] row, R result) {
-            if (region != null) {
-              results.put(region, result);
-            }
-          }
-        });
+      @Override
+      public void update(byte[] region, byte[] row, R result) {
+        if (region != null) {
+          results.put(region, result);
+        }
+      }
+    });
     return results;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
index 66d3c21..ae62255 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
@@ -21,16 +21,24 @@ package org.apache.hadoop.hbase.client;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+
 /**
  * A RetryingCallable for master operations.
  * @param <V> return type
  */
+// Like RegionServerCallable
 abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
   protected ClusterConnection connection;
   protected MasterKeepAliveConnection master;
+  private final PayloadCarryingRpcController rpcController;
 
-  public MasterCallable(final Connection connection) {
+  MasterCallable(final Connection connection,
+      final RpcControllerFactory rpcConnectionFactory) {
     this.connection = (ClusterConnection) connection;
+    this.rpcController = rpcConnectionFactory.newController();
   }
 
   @Override
@@ -59,4 +67,31 @@ abstract class MasterCallable<V> implements 
RetryingCallable<V>, Closeable {
   public long sleep(long pause, int tries) {
     return ConnectionUtils.getPauseTime(pause, tries);
   }
+
+  /**
+   * Override that changes Exception from {@link Exception} to {@link 
IOException}. It also does
+   * setup of an rpcController and calls through to the unimplemented
+   * call(PayloadCarryingRpcController) method; implement this method to add 
your rpc invocation.
+   */
+  @Override
+  // Same trick as in RegionServerCallable so users don't have to copy/paste 
so much boilerplate
+  // and so we contain references to protobuf. We can't set priority on the 
rpcController as
+  // we do in RegionServerCallable because we don't always have a Table when 
we call.
+  public V call(int callTimeout) throws IOException {
+    try {
+      this.rpcController.setCallTimeout(callTimeout);
+      return call(this.rpcController);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  /**
+   * Run RPC call.
+   * @param rpcController PayloadCarryingRpcController is a mouthful but it at 
a minimum is a
+   * facade on protobuf so we don't have to put protobuf everywhere; we can 
keep it behind this
+   * class.
+   * @throws Exception
+   */
+  protected abstract V call(PayloadCarryingRpcController rpcController) throws 
Exception;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
index e445b78..47693f4 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
@@ -33,8 +33,7 @@ import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
  * against the master on the MasterProtos.MasterService.BlockingInterface; but 
not by
  * final user code. Hence it's package protected.
  */
-interface MasterKeepAliveConnection
-extends MasterProtos.MasterService.BlockingInterface {
+interface MasterKeepAliveConnection extends 
MasterProtos.MasterService.BlockingInterface {
   // Do this instead of implement Closeable because closeable returning IOE is 
PITA.
   void close();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index e764ceb..a3162f4 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -30,8 +30,9 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -41,14 +42,14 @@ import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ServiceException;
 
 /**
  * Callable that handles the <code>multi</code> method call going against a 
single
- * regionserver; i.e. A {@link RegionServerCallable} for the multi call (It is 
not a
- * {@link RegionServerCallable} that goes against multiple regions.
+ * regionserver; i.e. A RegionServerCallable for the multi call (It is NOT a
+ * RegionServerCallable that goes against multiple regions).
  * @param <R>
  */
+@InterfaceAudience.Private
 class MultiServerCallable<R> extends 
PayloadCarryingServerCallable<MultiResponse> {
   private final MultiAction<R> multiAction;
   private final boolean cellBlock;
@@ -79,7 +80,7 @@ class MultiServerCallable<R> extends 
PayloadCarryingServerCallable<MultiResponse
   }
 
   @Override
-  public MultiResponse call(int callTimeout) throws IOException {
+  protected MultiResponse call(PayloadCarryingRpcController controller) throws 
Exception {
     int countOfActions = this.multiAction.size();
     if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
     MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
@@ -98,10 +99,8 @@ class MultiServerCallable<R> extends 
PayloadCarryingServerCallable<MultiResponse
       regionActionBuilder.clear();
       regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
           HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, 
regionName));
-
-
       if (this.cellBlock) {
-        // Presize.  Presume at least a KV per Action.  There are likely more.
+        // Pre-size. Presume at least a KV per Action.  There are likely more.
         if (cells == null) cells = new 
ArrayList<CellScannable>(countOfActions);
         // Send data in cellblocks. The call to buildNoDataMultiRequest will 
skip RowMutations.
         // They have already been handled above. Guess at count of cells
@@ -116,18 +115,18 @@ class MultiServerCallable<R> extends 
PayloadCarryingServerCallable<MultiResponse
 
     // Controller optionally carries cell data over the proxy/service boundary 
and also
     // optionally ferries cell response data back out again.
-    if (cells != null) 
controller.setCellScanner(CellUtil.createCellScanner(cells));
-    controller.setPriority(getTableName());
-    controller.setCallTimeout(callTimeout);
+    PayloadCarryingRpcController payloadCarryingRpcController = null;
+    if (cells != null) {
+      // Cast. Will fail if we have been passed wrong RpcController type.
+      payloadCarryingRpcController = (PayloadCarryingRpcController)controller;
+      
payloadCarryingRpcController.setCellScanner(CellUtil.createCellScanner(cells));
+    }
     ClientProtos.MultiResponse responseProto;
     ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
-    try {
-      responseProto = getStub().multi(controller, requestProto);
-    } catch (ServiceException e) {
-      throw ProtobufUtil.getRemoteException(e);
-    }
+    responseProto = getStub().multi(controller, requestProto);
     if (responseProto == null) return null; // Occurs on cancel
-    return ResponseConverter.getResults(requestProto, responseProto, 
controller.cellScanner());
+    return ResponseConverter.getResults(requestProto, responseProto,
+        payloadCarryingRpcController ==  null? null: 
payloadCarryingRpcController.cellScanner());
   }
 
   /**
@@ -151,4 +150,4 @@ class MultiServerCallable<R> extends 
PayloadCarryingServerCallable<MultiResponse
   ServerName getServerName() {
     return location.getServerName();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
index d94f069..83d857b 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
@@ -16,33 +16,51 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 
 /**
- * This class is used to unify HTable calls with AsyncProcess Framework.
- * HTable can use AsyncProcess directly though this class.
+ * This class is used to unify HTable calls with AsyncProcess Framework. 
HTable can use
+ * AsyncProcess directly though this class. Also adds global timeout tracking 
on top of
+ * RegionServerCallable and implements Cancellable.
  */
 @InterfaceAudience.Private
-public abstract class PayloadCarryingServerCallable<T>
-    extends RegionServerCallable<T> implements Cancellable {
-  protected PayloadCarryingRpcController controller;
+abstract class PayloadCarryingServerCallable<T> extends RegionServerCallable<T>
+    implements Cancellable {
+  private final RetryingTimeTracker tracker = new RetryingTimeTracker();
+
+  PayloadCarryingServerCallable(Connection connection, TableName tableName, 
byte[] row,
+      RpcControllerFactory rpcControllerFactory) {
+    super(connection, rpcControllerFactory, tableName, row);
+  }
 
-  public PayloadCarryingServerCallable(Connection connection, TableName 
tableName, byte[] row,
-    RpcControllerFactory rpcControllerFactory) {
-    super(connection, tableName, row);
-    this.controller = rpcControllerFactory.newController();
+  /* Override so can mess with the callTimeout.
+   * (non-Javadoc)
+   * @see org.apache.hadoop.hbase.client.RegionServerCallable#rpcCall(int)
+   */
+  @Override
+  public T call(int callTimeout) throws IOException {
+    // It is expected (it seems) that tracker.start can be called multiple 
times (on each trip
+    // through the call when retrying). Also, we can call start and no need of 
a stop.
+    this.tracker.start();
+    int remainingTime = tracker.getRemainingTime(callTimeout);
+    if (remainingTime == 0) {
+      throw new DoNotRetryIOException("Timeout for mutate row");
+    }
+    return super.call(remainingTime);
   }
 
   @Override
   public void cancel() {
-    controller.startCancel();
+    getRpcController().startCancel();
   }
 
   @Override
   public boolean isCancelled() {
-    return controller.isCanceled();
+    return getRpcController().isCanceled();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
index 54c93a0..4e347dd 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
@@ -27,31 +27,30 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Similar to {@link RegionServerCallable} but for the AdminService interface. 
This service callable
+ * Similar to RegionServerCallable but for the AdminService interface. This 
service callable
  * assumes a Table and row and thus does region locating similar to 
RegionServerCallable.
+ * Works against Admin stub rather than Client stub.
  */
 
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD",
   justification="stub used by ipc")
 @InterfaceAudience.Private
 public abstract class RegionAdminServiceCallable<T> implements 
RetryingCallable<T> {
-
-  protected final ClusterConnection connection;
-
-  protected final RpcControllerFactory rpcControllerFactory;
-
   protected AdminService.BlockingInterface stub;
+  protected final RpcControllerFactory rpcControllerFactory;
+  private PayloadCarryingRpcController controller = null;
 
+  protected final ClusterConnection connection;
   protected HRegionLocation location;
-
   protected final TableName tableName;
   protected final byte[] row;
   protected final int replicaId;
-
   protected final static int MIN_WAIT_DEAD_SERVER = 10000;
 
   public RegionAdminServiceCallable(ClusterConnection connection,
@@ -82,16 +81,13 @@ public abstract class RegionAdminServiceCallable<T> 
implements RetryingCallable<
     if (Thread.interrupted()) {
       throw new InterruptedIOException();
     }
-
     if (reload || location == null) {
       location = getLocation(!reload);
     }
-
     if (location == null) {
       // With this exception, there will be a retry.
       throw new HBaseIOException(getExceptionMessage());
     }
-
     this.setStub(connection.getAdmin(location.getServerName()));
   }
 
@@ -167,7 +163,39 @@ public abstract class RegionAdminServiceCallable<T> 
implements RetryingCallable<
     if (rl == null) {
       throw new RetriesExhaustedException("Can't get the locations");
     }
-
     return rl;
   }
-}
+
+  /**
+   * Override that changes Exception from {@link Exception} to {@link 
IOException}. It also does
+   * setup of an rpcController and calls through to the unimplemented
+   * call(PayloadCarryingRpcController) method; implement this method to add 
your rpc invocation.
+   */
+  @Override
+  // Same trick as in RegionServerCallable so users don't have to copy/paste 
so much boilerplate
+  // and so we contain references to protobuf. We can't set priority on the 
rpcController as
+  // we do in RegionServerCallable because we don't always have a Table when 
we call.
+  public T call(int callTimeout) throws IOException {
+    this.controller = rpcControllerFactory.newController();
+    this.controller.setPriority(this.tableName);
+    this.controller.setCallTimeout(callTimeout);
+    try {
+      return call(this.controller);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  PayloadCarryingRpcController getCurrentPayloadCarryingRpcController() {
+    return this.controller;
+  }
+
+  /**
+   * Run RPC call.
+   * @param rpcController PayloadCarryingRpcController is a mouthful but it at 
a minimum is a
+   * facade on protobuf so we don't have to put protobuf everywhere; we can 
keep it behind this
+   * class.
+   * @throws Exception
+   */
+  protected abstract T call(PayloadCarryingRpcController rpcController) throws 
Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
index d878bae..861b375 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,12 +23,20 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 
 /**
- * Implementations call a RegionServer and implement {@link #call(int)}.
- * Passed to a {@link RpcRetryingCaller} so we retry on fail.
- * TODO: this class is actually tied to one region, because most of the paths 
make use of
+ * Implementations make an rpc call against a RegionService via a protobuf 
Service.
+ * Implement #rpcCall(RpcController) and then call {@link #call(int)} to
+ * trigger the rpc. The {@link #call(int)} eventually invokes your
+ * #rpcCall(RpcController) meanwhile saving you having to write a bunch of
+ * boilerplate. The {@link #call(int)} implementation is from {@link 
RpcRetryingCaller} so rpcs are
+ * retried on fail.
+ *
+ * <p>TODO: this class is actually tied to one region, because most of the 
paths make use of
  *       the regioninfo part of location when building requests. The only 
reason it works for
  *       multi-region requests (e.g. batch) is that they happen to not use the 
region parts.
  *       This could be done cleaner (e.g. having a generic parameter and 2 
derived classes,
@@ -37,18 +44,27 @@ import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
  * @param <T> the class that the ServerCallable handles
  */
 @InterfaceAudience.Private
-public abstract class RegionServerCallable<T> extends 
AbstractRegionServerCallable<T> implements
-    RetryingCallable<T> {
-
+public abstract class RegionServerCallable<T> extends 
AbstractRegionServerCallable<T> {
   private ClientService.BlockingInterface stub;
+  private final PayloadCarryingRpcController rpcController;
 
   /**
    * @param connection Connection to use.
    * @param tableName Table name to which <code>row</code> belongs.
    * @param row The row we want in <code>tableName</code>.
    */
-  public RegionServerCallable(Connection connection, TableName tableName, byte 
[] row) {
+  public RegionServerCallable(Connection connection, RpcControllerFactory 
rpcControllerFactory,
+      TableName tableName, byte [] row) {
+    this(connection, rpcControllerFactory.newController(), tableName, row);
+  }
+
+  public RegionServerCallable(Connection connection, 
PayloadCarryingRpcController rpcController,
+      TableName tableName, byte [] row) {
     super(connection, tableName, row);
+    this.rpcController = rpcController;
+    if (this.rpcController != null) {
+      this.rpcController.setPriority(tableName);
+    }
   }
 
   void setClientByServiceName(ServerName service) throws IOException {
@@ -69,4 +85,42 @@ public abstract class RegionServerCallable<T> extends 
AbstractRegionServerCallab
   void setStub(final ClientService.BlockingInterface stub) {
     this.stub = stub;
   }
-}
+
+  /**
+   * Override that changes Exception from {@link Exception} to {@link 
IOException}. It also does
+   * setup of an rpcController and calls through to the unimplemented
+   * call(PayloadCarryingRpcController) method; implement this method to add 
your rpc invocation.
+   */
+  @Override
+  public T call(int callTimeout) throws IOException {
+    if (this.rpcController != null) {
+      this.rpcController.setCallTimeout(callTimeout);
+    }
+    try {
+      return call(this.rpcController);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  /**
+   * Run RPC call.
+   * @param rpcController PayloadCarryingRpcController is a mouthful but it at 
a minimum is a
+   * facade on protobuf so we don't have to put protobuf everywhere; we can 
keep it behind this
+   * class.
+   * @throws Exception
+   */
+  protected abstract T call(PayloadCarryingRpcController rpcController) throws 
Exception;
+
+  public PayloadCarryingRpcController getRpcController() {
+    return this.rpcController;
+  }
+
+  long getNonceGroup() {
+    return getConnection().getNonceGenerator().getNonceGroup();
+  }
+
+  long getNewNonce() {
+    return getConnection().getNonceGenerator().newNonce();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
index 24288e6..b9438e6 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  * Tracks the amount of time remaining for an operation.
  */
 class RetryingTimeTracker {
-
   private long globalStartTime = -1;
 
   public void start() {
@@ -38,16 +37,19 @@ class RetryingTimeTracker {
       if (callTimeout == Integer.MAX_VALUE) {
         return Integer.MAX_VALUE;
       }
-      int remainingTime = (int) (
-        callTimeout -
-        (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
+      long remaining = EnvironmentEdgeManager.currentTime() - 
this.globalStartTime;
+      long remainingTime = callTimeout - remaining;
       if (remainingTime < 1) {
         // If there is no time left, we're trying anyway. It's too late.
         // 0 means no timeout, and it's not the intent here. So we secure both 
cases by
         // resetting to the minimum.
         remainingTime = 1;
       }
-      return remainingTime;
+      if (remainingTime > Integer.MAX_VALUE) {
+        throw new RuntimeException("remainingTime=" + remainingTime +
+            " which is > Integer.MAX_VALUE");
+      }
+      return (int)remainingTime;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
index 0c2d345..644337d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
@@ -176,9 +176,9 @@ public class ReversedScannerCallable extends 
ScannerCallable {
 
   @Override
   public ScannerCallable getScannerCallableForReplica(int id) {
-    ReversedScannerCallable r = new ReversedScannerCallable(this.cConnection, 
this.tableName,
+    ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), 
this.tableName,
         this.getScan(), this.scanMetrics, this.locateStartRow, 
controllerFactory, id);
     r.setCaching(this.getCaching());
     return r;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
new file mode 100644
index 0000000..68a4aa2
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+
+/**
+ * A RetryingCallable for RPC connection operations.
+ * @param <V> return type
+ */
+abstract class RpcRetryingCallable<V> implements RetryingCallable<V>, 
Closeable {
+  @Override
+  public void prepare(boolean reload) throws IOException {
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  @Override
+  public void throwable(Throwable t, boolean retrying) {
+  }
+
+  @Override
+  public String getExceptionMessageAdditionalDetail() {
+    return "";
+  }
+
+  @Override
+  public long sleep(long pause, int tries) {
+    return ConnectionUtils.getPauseTime(pause, tries);
+  }
+
+  @Override
+  // Same trick as in RegionServerCallable so users don't have to copy/paste 
so much boilerplate
+  // and so we contain references to protobuf.
+  public V call(int callTimeout) throws IOException {
+    try {
+      return rpcCall(callTimeout);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  protected abstract V rpcCall(int callTimeout) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
index b4cd2ef..2b2e4c8 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
@@ -22,9 +22,6 @@ import 
org.apache.hadoop.hbase.classification.InterfaceStability;
 
 import java.io.IOException;
 
-/**
- *
- */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public interface RpcRetryingCaller<T> {
@@ -52,4 +49,4 @@ public interface RpcRetryingCaller<T> {
    */
   T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
   throws IOException, RuntimeException;
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
index 1c723c5..f92aeae 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
@@ -36,6 +36,7 @@ public class RpcRetryingCallerFactory {
   private final int rpcTimeout;
   private final RetryingCallerInterceptor interceptor;
   private final int startLogErrorsCnt;
+  /* These below data members are UNUSED!!!*/
   private final boolean enableBackPressure;
   private ServerStatisticTracker stats;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 65dbb10..2785648 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -29,8 +29,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
@@ -46,8 +44,6 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
-import com.google.protobuf.ServiceException;
-
 
 /**
  * Caller that goes to replica if the primary region does no answer within a 
configurable
@@ -57,8 +53,6 @@ import com.google.protobuf.ServiceException;
  */
 @InterfaceAudience.Private
 public class RpcRetryingCallerWithReadReplicas {
-  private static final Log LOG = 
LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
-
   protected final ExecutorService pool;
   protected final ClusterConnection cConnection;
   protected final Configuration conf;
@@ -98,7 +92,7 @@ public class RpcRetryingCallerWithReadReplicas {
     private final PayloadCarryingRpcController controller;
 
     public ReplicaRegionServerCallable(int id, HRegionLocation location) {
-      super(RpcRetryingCallerWithReadReplicas.this.cConnection,
+      super(RpcRetryingCallerWithReadReplicas.this.cConnection, 
rpcControllerFactory,
           RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
       this.id = id;
       this.location = location;
@@ -141,28 +135,20 @@ public class RpcRetryingCallerWithReadReplicas {
     }
 
     @Override
-    public Result call(int callTimeout) throws Exception {
+    protected Result call(PayloadCarryingRpcController controller) throws 
Exception {
       if (controller.isCanceled()) return null;
-
       if (Thread.interrupted()) {
         throw new InterruptedIOException();
       }
-
       byte[] reg = location.getRegionInfo().getRegionName();
-
       ClientProtos.GetRequest request =
           RequestConverter.buildGetRequest(reg, get);
       controller.setCallTimeout(callTimeout);
-
-      try {
-        ClientProtos.GetResponse response = getStub().get(controller, request);
-        if (response == null) {
-          return null;
-        }
-        return ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+      ClientProtos.GetResponse response = getStub().get(controller, request);
+      if (response == null) {
+        return null;
       }
+      return ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 72d69ec..1689d11 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -52,9 +52,6 @@ import 
org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
 
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.TextFormat;
-
 /**
  * Scanner operations such as create, next, etc.
  * Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying 
caller such as
@@ -74,7 +71,6 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
   protected boolean renew = false;
   private Scan scan;
   private int caching = 1;
-  protected final ClusterConnection cConnection;
   protected ScanMetrics scanMetrics;
   private boolean logScannerActivity = false;
   private int logCutOffLatency = 1000;
@@ -125,9 +121,8 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
    */
   public ScannerCallable(ClusterConnection connection, TableName tableName, 
Scan scan,
       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int 
id) {
-    super(connection, tableName, scan.getStartRow());
+    super(connection, rpcControllerFactory, tableName, scan.getStartRow());
     this.id = id;
-    this.cConnection = connection;
     this.scan = scan;
     this.scanMetrics = scanMetrics;
     Configuration conf = connection.getConfiguration();
@@ -185,25 +180,16 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
     }
   }
 
-
-  @Override
-  public Result [] call(int callTimeout) throws IOException {
+  protected Result [] call(PayloadCarryingRpcController controller) throws 
Exception {
     if (Thread.interrupted()) {
       throw new InterruptedIOException();
     }
-
-    if (controller == null) {
-      controller = controllerFactory.newController();
-      controller.setPriority(getTableName());
-      controller.setCallTimeout(callTimeout);
-    }
-
-    if (closed) {
-      if (scannerId != -1) {
+    if (this.closed) {
+      if (this.scannerId != -1) {
         close();
       }
     } else {
-      if (scannerId == -1L) {
+      if (this.scannerId == -1L) {
         this.scannerId = openScanner();
       } else {
         Result [] rrs = null;
@@ -212,61 +198,56 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
         setHeartbeatMessage(false);
         try {
           incRPCcallsMetrics();
-          request =
-              RequestConverter.buildScanRequest(scannerId, caching, false, 
nextCallSeq,
+          request = RequestConverter.buildScanRequest(scannerId, caching, 
false, nextCallSeq,
                 this.scanMetrics != null, renew);
           ScanResponse response = null;
-          try {
-            response = getStub().scan(controller, request);
-            // Client and RS maintain a nextCallSeq number during the scan. 
Every next() call
-            // from client to server will increment this number in both sides. 
Client passes this
-            // number along with the request and at RS side both the incoming 
nextCallSeq and its
-            // nextCallSeq will be matched. In case of a timeout this 
increment at the client side
-            // should not happen. If at the server side fetching of next batch 
of data was over,
-            // there will be mismatch in the nextCallSeq number. Server will 
throw
-            // OutOfOrderScannerNextException and then client will reopen the 
scanner with startrow
-            // as the last successfully retrieved row.
-            // See HBASE-5974
-            nextCallSeq++;
-            long timestamp = System.currentTimeMillis();
-            setHeartbeatMessage(response.hasHeartbeatMessage() && 
response.getHeartbeatMessage());
-            // Results are returned via controller
-            CellScanner cellScanner = controller.cellScanner();
-            rrs = ResponseConverter.getResults(cellScanner, response);
-            if (logScannerActivity) {
-              long now = System.currentTimeMillis();
-              if (now - timestamp > logCutOffLatency) {
-                int rows = rrs == null ? 0 : rrs.length;
-                LOG.info("Took " + (now-timestamp) + "ms to fetch "
+          response = getStub().scan(controller, request);
+          // Client and RS maintain a nextCallSeq number during the scan. 
Every next() call
+          // from client to server will increment this number in both sides. 
Client passes this
+          // number along with the request and at RS side both the incoming 
nextCallSeq and its
+          // nextCallSeq will be matched. In case of a timeout this increment 
at the client side
+          // should not happen. If at the server side fetching of next batch 
of data was over,
+          // there will be mismatch in the nextCallSeq number. Server will 
throw
+          // OutOfOrderScannerNextException and then client will reopen the 
scanner with startrow
+          // as the last successfully retrieved row.
+          // See HBASE-5974
+          nextCallSeq++;
+          long timestamp = System.currentTimeMillis();
+          setHeartbeatMessage(response.hasHeartbeatMessage() && 
response.getHeartbeatMessage());
+          // Results are returned via controller
+          CellScanner cellScanner = controller.cellScanner();
+          rrs = ResponseConverter.getResults(cellScanner, response);
+          if (logScannerActivity) {
+            long now = System.currentTimeMillis();
+            if (now - timestamp > logCutOffLatency) {
+              int rows = rrs == null ? 0 : rrs.length;
+              LOG.info("Took " + (now-timestamp) + "ms to fetch "
                   + rows + " rows from scanner=" + scannerId);
-              }
-            }
-            updateServerSideMetrics(response);
-            // moreResults is only used for the case where a filter exhausts 
all elements
-            if (response.hasMoreResults() && !response.getMoreResults()) {
-              scannerId = -1L;
-              closed = true;
-              // Implied that no results were returned back, either.
-              return null;
             }
-            // moreResultsInRegion explicitly defines when a RS may choose to 
terminate a batch due
-            // to size or quantity of results in the response.
-            if (response.hasMoreResultsInRegion()) {
-              // Set what the RS said
-              setHasMoreResultsContext(true);
-              setServerHasMoreResults(response.getMoreResultsInRegion());
-            } else {
-              // Server didn't respond whether it has more results or not.
-              setHasMoreResultsContext(false);
-            }
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
+          }
+          updateServerSideMetrics(response);
+          // moreResults is only used for the case where a filter exhausts all 
elements
+          if (response.hasMoreResults() && !response.getMoreResults()) {
+            this.scannerId = -1L;
+            this.closed = true;
+            // Implied that no results were returned back, either.
+            return null;
+          }
+          // moreResultsInRegion explicitly defines when a RS may choose to 
terminate a batch due
+          // to size or quantity of results in the response.
+          if (response.hasMoreResultsInRegion()) {
+            // Set what the RS said
+            setHasMoreResultsContext(true);
+            setServerHasMoreResults(response.getMoreResultsInRegion());
+          } else {
+            // Server didn't respond whether it has more results or not.
+            setHasMoreResultsContext(false);
           }
           updateResultsMetrics(rrs);
         } catch (IOException e) {
           if (logScannerActivity) {
-            LOG.info("Got exception making request " + 
TextFormat.shortDebugString(request)
-              + " to " + getLocation(), e);
+            LOG.info("Got exception making request " + 
ProtobufUtil.toText(request) + " to " +
+                getLocation(), e);
           }
           IOException ioe = e;
           if (e instanceof RemoteException) {
@@ -275,9 +256,9 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
             try {
               HRegionLocation location =
-                getConnection().relocateRegion(getTableName(), 
scan.getStartRow());
-              LOG.info("Scanner=" + scannerId
-                + " expired, current region location is " + 
location.toString());
+                  getConnection().relocateRegion(getTableName(), 
scan.getStartRow());
+              LOG.info("Scanner=" + scannerId + " expired, current region 
location is " +
+                  location.toString());
             } catch (Throwable t) {
               LOG.info("Failed to relocate region", t);
             }
@@ -376,8 +357,8 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
           RequestConverter.buildScanRequest(this.scannerId, 0, true, 
this.scanMetrics != null);
       try {
         getStub().scan(controller, request);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+      } catch (Exception e) {
+        throw ProtobufUtil.handleRemoteException(e);
       }
     } catch (IOException e) {
       LOG.warn("Ignore, probably already closed", e);
@@ -387,10 +368,8 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
 
   protected long openScanner() throws IOException {
     incRPCcallsMetrics();
-    ScanRequest request =
-      RequestConverter.buildScanRequest(
-        getLocation().getRegionInfo().getRegionName(),
-        this.scan, 0, false);
+    ScanRequest request = RequestConverter.buildScanRequest(
+        getLocation().getRegionInfo().getRegionName(), this.scan, 0, false);
     try {
       ScanResponse response = getStub().scan(controller, request);
       long id = response.getScannerId();
@@ -399,8 +378,8 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
           + " on region " + getLocation().toString());
       }
       return id;
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
     }
   }
 
@@ -443,11 +422,6 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
     return caching;
   }
 
-  @Override
-  public ClusterConnection getConnection() {
-    return cConnection;
-  }
-
   /**
    * Set the number of rows that will be fetched on next
    * @param caching the number of rows for caching
@@ -488,4 +462,4 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
   protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) 
{
     this.serverHasMoreResultsContext = serverHasMoreResultsContext;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index 7b1547d..d6896e1 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -22,6 +22,9 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -38,41 +41,35 @@ import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.security.token.Token;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * Client proxy for SecureBulkLoadProtocol
  */
 @InterfaceAudience.Private
 public class SecureBulkLoadClient {
   private Table table;
+  private final RpcControllerFactory rpcControllerFactory;
 
-  public SecureBulkLoadClient(Table table) {
+  public SecureBulkLoadClient(final Configuration conf, Table table) {
     this.table = table;
+    this.rpcControllerFactory = new RpcControllerFactory(conf);
   }
 
   public String prepareBulkLoad(final Connection conn) throws IOException {
     try {
-      RegionServerCallable<String> callable =
-          new RegionServerCallable<String>(conn, table.getName(), 
HConstants.EMPTY_START_ROW) {
-            @Override
-            public String call(int callTimeout) throws IOException {
-              byte[] regionName = 
getLocation().getRegionInfo().getRegionName();
-              RegionSpecifier region =
-                  RequestConverter
-                      .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 
regionName);
-              try {
-                PrepareBulkLoadRequest request =
-                    PrepareBulkLoadRequest.newBuilder()
-                        
.setTableName(ProtobufUtil.toProtoTableName(table.getName()))
-                        .setRegion(region).build();
-                PrepareBulkLoadResponse response = 
getStub().prepareBulkLoad(null, request);
-                return response.getBulkToken();
-              } catch (ServiceException se) {
-                throw ProtobufUtil.getRemoteException(se);
-              }
-            }
-          };
+      RegionServerCallable<String> callable = new 
RegionServerCallable<String>(conn,
+          this.rpcControllerFactory, table.getName(), 
HConstants.EMPTY_START_ROW) {
+        @Override
+        protected String call(PayloadCarryingRpcController controller) throws 
Exception {
+          byte[] regionName = getLocation().getRegionInfo().getRegionName();
+          RegionSpecifier region =
+              
RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 
regionName);
+          PrepareBulkLoadRequest request = PrepareBulkLoadRequest.newBuilder()
+              .setTableName(ProtobufUtil.toProtoTableName(table.getName()))
+              .setRegion(region).build();
+          PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, 
request);
+          return response.getBulkToken();
+        }
+      };
       return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), 
null)
           .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
     } catch (Throwable throwable) {
@@ -82,24 +79,19 @@ public class SecureBulkLoadClient {
 
   public void cleanupBulkLoad(final Connection conn, final String bulkToken) 
throws IOException {
     try {
-      RegionServerCallable<Void> callable =
-          new RegionServerCallable<Void>(conn, table.getName(), 
HConstants.EMPTY_START_ROW) {
-            @Override
-            public Void call(int callTimeout) throws IOException {
-              byte[] regionName = 
getLocation().getRegionInfo().getRegionName();
-              RegionSpecifier region = RequestConverter.buildRegionSpecifier(
-                RegionSpecifierType.REGION_NAME, regionName);
-              try {
-                CleanupBulkLoadRequest request =
-                    CleanupBulkLoadRequest.newBuilder().setRegion(region)
-                        .setBulkToken(bulkToken).build();
-                getStub().cleanupBulkLoad(null, request);
-              } catch (ServiceException se) {
-                throw ProtobufUtil.getRemoteException(se);
-              }
-              return null;
-            }
-          };
+      RegionServerCallable<Void> callable = new 
RegionServerCallable<Void>(conn,
+          this.rpcControllerFactory, table.getName(), 
HConstants.EMPTY_START_ROW) {
+        @Override
+        protected Void call(PayloadCarryingRpcController controller) throws 
Exception {
+          byte[] regionName = getLocation().getRegionInfo().getRegionName();
+          RegionSpecifier region = RequestConverter.buildRegionSpecifier(
+              RegionSpecifierType.REGION_NAME, regionName);
+          CleanupBulkLoadRequest request =
+              
CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bulkToken).build();
+          getStub().cleanupBulkLoad(null, request);
+          return null;
+        }
+      };
       RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
           .<Void> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
     } catch (Throwable throwable) {
@@ -130,12 +122,12 @@ public class SecureBulkLoadClient {
     try {
       BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
       return response.getLoaded();
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
+    } catch (Exception se) {
+      throw ProtobufUtil.handleRemoteException(se);
     }
   }
 
   public Path getStagingPath(String bulkToken, byte[] family) throws 
IOException {
     return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), 
bulkToken, family);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
index 6fae5cb..a6384e3 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
@@ -77,5 +77,4 @@ public class MasterCoprocessorRpcChannel extends 
SyncCoprocessorRpcChannel {
     }
     return response;
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
index f4f18b3..6c290a6 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
@@ -17,24 +17,39 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
 
 /**
  * Optionally carries Cells across the proxy/service interface down into ipc. 
On its
- * way out it optionally carries a set of result Cell data.  We stick the 
Cells here when we want
- * to avoid having to protobuf them.  This class is used ferrying data across 
the proxy/protobuf
- * service chasm.  Used by client and server ipc'ing.
+ * way out it optionally carries a set of result Cell data. We stick the Cells 
here when we want
+ * to avoid having to protobuf them (for performance reasons). This class is 
used ferrying data
+ * across the proxy/protobuf service chasm. Also does call timeout. Used by 
client and server
+ * ipc'ing.
  */
 @InterfaceAudience.Private
-public class PayloadCarryingRpcController
-    extends TimeLimitedRpcController implements CellScannable {
+public class PayloadCarryingRpcController implements RpcController, 
CellScannable {
+  /**
+   * The time, in ms before the call should expire.
+   */
+  protected volatile Integer callTimeout;
+  protected volatile boolean cancelled = false;
+  protected final AtomicReference<RpcCallback<Object>> cancellationCb = new 
AtomicReference<>(null);
+  protected final AtomicReference<RpcCallback<IOException>> failureCb = new 
AtomicReference<>(null);
+  private IOException exception;
 
   public static final int PRIORITY_UNSET = -1;
   /**
@@ -93,15 +108,123 @@ public class PayloadCarryingRpcController
   }
 
   /**
+   * @param regionName RegionName. If hbase:meta, we'll set high priority.
+   */
+  public void setPriority(final byte [] regionName) {
+    if (isMetaRegion(regionName)) {
+      setPriority(TableName.META_TABLE_NAME);
+    }
+  }
+
+  private static boolean isMetaRegion(final byte[] regionName) {
+    return Bytes.equals(regionName, 
HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
+        || Bytes.equals(regionName, 
HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
+  }
+
+  /**
    * @return The priority of this request
    */
   public int getPriority() {
     return priority;
   }
 
-  @Override public void reset() {
-    super.reset();
+  @Override
+  public void reset() {
     priority = 0;
     cellScanner = null;
+    exception = null;
+    cancelled = false;
+    failureCb.set(null);
+    cancellationCb.set(null);
+    callTimeout = null;
+  }
+
+  public int getCallTimeout() {
+    if (callTimeout != null) {
+      return callTimeout;
+    } else {
+      return 0;
+    }
+  }
+
+  public void setCallTimeout(int callTimeout) {
+    this.callTimeout = callTimeout;
+  }
+
+  public boolean hasCallTimeout(){
+    return callTimeout != null;
+  }
+
+  @Override
+  public String errorText() {
+    if (exception != null) {
+      return exception.getMessage();
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * For use in async rpc clients
+   * @return true if failed
+   */
+  @Override
+  public boolean failed() {
+    return this.exception != null;
+  }
+
+  @Override
+  public boolean isCanceled() {
+    return cancelled;
+  }
+
+  @Override
+  public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
+    this.cancellationCb.set(cancellationCb);
+    if (this.cancelled) {
+      cancellationCb.run(null);
+    }
+  }
+
+  /**
+   * Notify a callback on error.
+   * For use in async rpc clients
+   *
+   * @param failureCb the callback to call on error
+   */
+  public void notifyOnFail(RpcCallback<IOException> failureCb) {
+    this.failureCb.set(failureCb);
+    if (this.exception != null) {
+      failureCb.run(this.exception);
+    }
+  }
+
+  @Override
+  public void setFailed(String reason) {
+    this.exception = new IOException(reason);
+    if (this.failureCb.get() != null) {
+      this.failureCb.get().run(this.exception);
+    }
+  }
+
+  /**
+   * Set failed with an exception to pass on.
+   * For use in async rpc clients
+   *
+   * @param e exception to set with
+   */
+  public void setFailed(IOException e) {
+    this.exception = e;
+    if (this.failureCb.get() != null) {
+      this.failureCb.get().run(this.exception);
+    }
+  }
+
+  @Override
+  public void startCancel() {
+    cancelled = true;
+    if (cancellationCb.get() != null) {
+      cancellationCb.get().run(null);
+    }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
index 55d6375..dbc9041 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
@@ -76,30 +76,23 @@ public class RegionCoprocessorRpcChannel extends 
SyncCoprocessorRpcChannel {
       Descriptors.MethodDescriptor method, Message request, Message 
responsePrototype)
           throws IOException {
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Call: "+method.getName()+", "+request.toString());
+      LOG.trace("Call: " + method.getName() + ", " + request.toString());
     }
-
     if (row == null) {
       throw new IllegalArgumentException("Missing row property for remote 
region location");
     }
-
-    final RpcController rpcController = controller == null
-        ? rpcControllerFactory.newController() : controller;
-
     final ClientProtos.CoprocessorServiceCall call =
         CoprocessorRpcUtils.buildServiceCall(row, method, request);
     RegionServerCallable<CoprocessorServiceResponse> callable =
-        new RegionServerCallable<CoprocessorServiceResponse>(connection, 
table, row) {
+        new RegionServerCallable<CoprocessorServiceResponse>(connection,
+          controller == null? this.rpcControllerFactory.newController():
+            (PayloadCarryingRpcController)controller,
+          table, row) {
       @Override
-      public CoprocessorServiceResponse call(int callTimeout) throws Exception 
{
-        if (rpcController instanceof PayloadCarryingRpcController) {
-          ((PayloadCarryingRpcController) 
rpcController).setPriority(tableName);
-        }
-        if (rpcController instanceof TimeLimitedRpcController) {
-          ((TimeLimitedRpcController) 
rpcController).setCallTimeout(callTimeout);
-        }
+      protected CoprocessorServiceResponse call(PayloadCarryingRpcController 
controller)
+      throws Exception {
         byte[] regionName = getLocation().getRegionInfo().getRegionName();
-        return ProtobufUtil.execService(rpcController, getStub(), call, 
regionName);
+        return ProtobufUtil.execService(getRpcController(), getStub(), call, 
regionName);
       }
     };
     CoprocessorServiceResponse result = 
rpcCallerFactory.<CoprocessorServiceResponse> newCaller()

Reply via email to