http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 1b3e111..fbd9f51 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -18,6 +18,12 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
@@ -37,6 +43,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -67,16 +74,6 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.util.Threads;
 
-import com.google.common.annotations.VisibleForTesting;
-
-// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS 
ONLY.
-// Internally, we use shaded protobuf. This below are part of our public API.
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.Service;
-// SEE ABOVE NOTE!
-
 /**
  * An implementation of {@link Table}. Used to communicate with a single HBase 
table.
  * Lightweight. Get as needed and just close when done.
@@ -414,16 +411,23 @@ public class HTable implements Table {
 
     if (get.getConsistency() == Consistency.STRONG) {
       // Good old call.
-      final Get configuredGet = get;
+      final Get getReq = get;
       RegionServerCallable<Result> callable = new 
RegionServerCallable<Result>(this.connection,
-          this.rpcControllerFactory, getName(), get.getRow()) {
+          getName(), get.getRow()) {
         @Override
-        protected Result call(PayloadCarryingRpcController controller) throws 
Exception {
-          ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
-              getLocation().getRegionInfo().getRegionName(), configuredGet);
-          ClientProtos.GetResponse response = getStub().get(controller, 
request);
-          if (response == null) return null;
-          return ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
+        public Result call(int callTimeout) throws IOException {
+          ClientProtos.GetRequest request =
+            
RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), 
getReq);
+          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+          controller.setPriority(tableName);
+          controller.setCallTimeout(callTimeout);
+          try {
+            ClientProtos.GetResponse response = getStub().get(controller, 
request);
+            if (response == null) return null;
+            return ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
+          } catch (ServiceException se) {
+            throw ProtobufUtil.getRemoteException(se);
+          }
         }
       };
       return 
rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
@@ -439,6 +443,7 @@ public class HTable implements Table {
     return callable.call(operationTimeout);
   }
 
+
   /**
    * {@inheritDoc}
    */
@@ -449,14 +454,16 @@ public class HTable implements Table {
     }
     try {
       Object[] r1 = new Object[gets.size()];
-      batch((List<? extends Row>)gets, r1);
-      // Translate.
+      batch((List) gets, r1);
+
+      // translate.
       Result [] results = new Result[r1.length];
-      int i = 0;
-      for (Object obj: r1) {
-        // Batch ensures if there is a failure we get an exception instead
-        results[i++] = (Result)obj;
+      int i=0;
+      for (Object o : r1) {
+        // batch ensures if there is a failure we get an exception instead
+        results[i++] = (Result) o;
       }
+
       return results;
     } catch (InterruptedException e) {
       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
@@ -504,13 +511,21 @@ public class HTable implements Table {
   public void delete(final Delete delete)
   throws IOException {
     RegionServerCallable<Boolean> callable = new 
RegionServerCallable<Boolean>(connection,
-        this.rpcControllerFactory, getName(), delete.getRow()) {
+        tableName, delete.getRow()) {
       @Override
-      protected Boolean call(PayloadCarryingRpcController controller) throws 
Exception {
-        MutateRequest request = RequestConverter.buildMutateRequest(
-          getLocation().getRegionInfo().getRegionName(), delete);
-        MutateResponse response = getStub().mutate(controller, request);
-        return Boolean.valueOf(response.getProcessed());
+      public Boolean call(int callTimeout) throws IOException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setPriority(tableName);
+        controller.setCallTimeout(callTimeout);
+
+        try {
+          MutateRequest request = RequestConverter.buildMutateRequest(
+            getLocation().getRegionInfo().getRegionName(), delete);
+          MutateResponse response = getStub().mutate(controller, request);
+          return Boolean.valueOf(response.getProcessed());
+        } catch (ServiceException se) {
+          throw ProtobufUtil.getRemoteException(se);
+        }
       }
     };
     rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
@@ -566,28 +581,41 @@ public class HTable implements Table {
    */
   @Override
   public void mutateRow(final RowMutations rm) throws IOException {
+    final RetryingTimeTracker tracker = new RetryingTimeTracker();
     PayloadCarryingServerCallable<MultiResponse> callable =
-      new PayloadCarryingServerCallable<MultiResponse>(this.connection, 
getName(), rm.getRow(),
+      new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), 
rm.getRow(),
           rpcControllerFactory) {
-      @Override
-      protected MultiResponse call(PayloadCarryingRpcController controller) 
throws Exception {
-        RegionAction.Builder regionMutationBuilder = 
RequestConverter.buildRegionAction(
-            getLocation().getRegionInfo().getRegionName(), rm);
-        regionMutationBuilder.setAtomic(true);
-        MultiRequest request =
-            
MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
-        ClientProtos.MultiResponse response = getStub().multi(controller, 
request);
-        ClientProtos.RegionActionResult res = 
response.getRegionActionResultList().get(0);
-        if (res.hasException()) {
-          Throwable ex = ProtobufUtil.toException(res.getException());
-          if (ex instanceof IOException) {
-            throw (IOException) ex;
+        @Override
+        public MultiResponse call(int callTimeout) throws IOException {
+          tracker.start();
+          controller.setPriority(tableName);
+          int remainingTime = tracker.getRemainingTime(callTimeout);
+          if (remainingTime == 0) {
+            throw new DoNotRetryIOException("Timeout for mutate row");
+          }
+          controller.setCallTimeout(remainingTime);
+          try {
+            RegionAction.Builder regionMutationBuilder = 
RequestConverter.buildRegionAction(
+                getLocation().getRegionInfo().getRegionName(), rm);
+            regionMutationBuilder.setAtomic(true);
+            MultiRequest request =
+                
MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
+            ClientProtos.MultiResponse response = getStub().multi(controller, 
request);
+            ClientProtos.RegionActionResult res = 
response.getRegionActionResultList().get(0);
+            if (res.hasException()) {
+              Throwable ex = ProtobufUtil.toException(res.getException());
+              if (ex instanceof IOException) {
+                throw (IOException) ex;
+              }
+              throw new IOException("Failed to mutate row: " +
+                  Bytes.toStringBinary(rm.getRow()), ex);
+            }
+            return ResponseConverter.getResults(request, response, 
controller.cellScanner());
+          } catch (ServiceException se) {
+            throw ProtobufUtil.getRemoteException(se);
           }
-          throw new IOException("Failed to mutate row: " + 
Bytes.toStringBinary(rm.getRow()), ex);
         }
-        return ResponseConverter.getResults(request, response, 
controller.cellScanner());
-      }
-    };
+      };
     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, 
rm.getMutations(),
         null, null, callable, operationTimeout);
     ars.waitUntilDone();
@@ -596,31 +624,38 @@ public class HTable implements Table {
     }
   }
 
-  private static void checkHasFamilies(final Mutation mutation) throws 
IOException {
-    if (mutation.numFamilies() == 0) {
-      throw new IOException("Invalid arguments to " + mutation + ", zero 
columns specified");
-    }
-  }
-
   /**
    * {@inheritDoc}
    */
   @Override
   public Result append(final Append append) throws IOException {
-    checkHasFamilies(append);
-    RegionServerCallable<Result> callable = new 
RegionServerCallable<Result>(this.connection,
-        this.rpcControllerFactory, getName(), append.getRow()) {
-      @Override
-      protected Result call(PayloadCarryingRpcController controller) throws 
Exception {
-        MutateRequest request = RequestConverter.buildMutateRequest(
-          getLocation().getRegionInfo().getRegionName(), append, 
getNonceGroup(), getNewNonce());
-        MutateResponse response = getStub().mutate(controller, request);
-        if (!response.hasResult()) return null;
-        return ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
-      }
-    };
-    return rpcCallerFactory.<Result> newCaller(this.rpcTimeout).
-        callWithRetries(callable, this.operationTimeout);
+    if (append.numFamilies() == 0) {
+      throw new IOException(
+          "Invalid arguments to append, no columns specified");
+    }
+
+    NonceGenerator ng = this.connection.getNonceGenerator();
+    final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
+    RegionServerCallable<Result> callable =
+      new RegionServerCallable<Result>(this.connection, getName(), 
append.getRow()) {
+        @Override
+        public Result call(int callTimeout) throws IOException {
+          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+          controller.setPriority(getTableName());
+          controller.setCallTimeout(callTimeout);
+          try {
+            MutateRequest request = RequestConverter.buildMutateRequest(
+              getLocation().getRegionInfo().getRegionName(), append, 
nonceGroup, nonce);
+            MutateResponse response = getStub().mutate(controller, request);
+            if (!response.hasResult()) return null;
+            return ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
+          } catch (ServiceException se) {
+            throw ProtobufUtil.getRemoteException(se);
+          }
+        }
+      };
+    return rpcCallerFactory.<Result> 
newCaller(rpcTimeout).callWithRetries(callable,
+        this.operationTimeout);
   }
 
   /**
@@ -628,16 +663,27 @@ public class HTable implements Table {
    */
   @Override
   public Result increment(final Increment increment) throws IOException {
-    checkHasFamilies(increment);
+    if (!increment.hasFamilies()) {
+      throw new IOException(
+          "Invalid arguments to increment, no columns specified");
+    }
+    NonceGenerator ng = this.connection.getNonceGenerator();
+    final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
     RegionServerCallable<Result> callable = new 
RegionServerCallable<Result>(this.connection,
-        this.rpcControllerFactory, getName(), increment.getRow()) {
+        getName(), increment.getRow()) {
       @Override
-      protected Result call(PayloadCarryingRpcController controller) throws 
Exception {
-        MutateRequest request = RequestConverter.buildMutateRequest(
-          getLocation().getRegionInfo().getRegionName(), increment, 
getNonceGroup(), getNewNonce());
-        MutateResponse response = getStub().mutate(controller, request);
-        // Should this check for null like append does?
-        return ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
+      public Result call(int callTimeout) throws IOException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setPriority(getTableName());
+        controller.setCallTimeout(callTimeout);
+        try {
+          MutateRequest request = RequestConverter.buildMutateRequest(
+            getLocation().getRegionInfo().getRegionName(), increment, 
nonceGroup, nonce);
+          MutateResponse response = getStub().mutate(controller, request);
+          return ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
+        } catch (ServiceException se) {
+          throw ProtobufUtil.getRemoteException(se);
+        }
       }
     };
     return rpcCallerFactory.<Result> 
newCaller(rpcTimeout).callWithRetries(callable,
@@ -676,20 +722,28 @@ public class HTable implements Table {
 
     NonceGenerator ng = this.connection.getNonceGenerator();
     final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
-    RegionServerCallable<Long> callable = new 
RegionServerCallable<Long>(this.connection,
-        this.rpcControllerFactory, getName(), row) {
-      @Override
-      protected Long call(PayloadCarryingRpcController controller) throws 
Exception {
-        MutateRequest request = RequestConverter.buildIncrementRequest(
-          getLocation().getRegionInfo().getRegionName(), row, family,
-          qualifier, amount, durability, nonceGroup, nonce);
-        MutateResponse response = getStub().mutate(controller, request);
-        Result result = ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
-        return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
-      }
-    };
-    return rpcCallerFactory.<Long> newCaller(rpcTimeout).
-        callWithRetries(callable, this.operationTimeout);
+    RegionServerCallable<Long> callable =
+      new RegionServerCallable<Long>(connection, getName(), row) {
+        @Override
+        public Long call(int callTimeout) throws IOException {
+          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+          controller.setPriority(getTableName());
+          controller.setCallTimeout(callTimeout);
+          try {
+            MutateRequest request = RequestConverter.buildIncrementRequest(
+              getLocation().getRegionInfo().getRegionName(), row, family,
+              qualifier, amount, durability, nonceGroup, nonce);
+            MutateResponse response = getStub().mutate(controller, request);
+            Result result =
+              ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
+            return Long.valueOf(Bytes.toLong(result.getValue(family, 
qualifier)));
+          } catch (ServiceException se) {
+            throw ProtobufUtil.getRemoteException(se);
+          }
+        }
+      };
+    return rpcCallerFactory.<Long> 
newCaller(rpcTimeout).callWithRetries(callable,
+        this.operationTimeout);
   }
 
   /**
@@ -700,19 +754,26 @@ public class HTable implements Table {
       final byte [] family, final byte [] qualifier, final byte [] value,
       final Put put)
   throws IOException {
-    RegionServerCallable<Boolean> callable = new 
RegionServerCallable<Boolean>(this.connection,
-        this.rpcControllerFactory, getName(), row) {
-      @Override
-      protected Boolean call(PayloadCarryingRpcController controller) throws 
Exception {
-        MutateRequest request = RequestConverter.buildMutateRequest(
-          getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
-          new BinaryComparator(value), CompareType.EQUAL, put);
-        MutateResponse response = getStub().mutate(controller, request);
-        return Boolean.valueOf(response.getProcessed());
-      }
-    };
-    return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).
-        callWithRetries(callable, this.operationTimeout);
+    RegionServerCallable<Boolean> callable =
+      new RegionServerCallable<Boolean>(connection, getName(), row) {
+        @Override
+        public Boolean call(int callTimeout) throws IOException {
+          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+          controller.setPriority(tableName);
+          controller.setCallTimeout(callTimeout);
+          try {
+            MutateRequest request = RequestConverter.buildMutateRequest(
+                getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
+                new BinaryComparator(value), CompareType.EQUAL, put);
+            MutateResponse response = getStub().mutate(controller, request);
+            return Boolean.valueOf(response.getProcessed());
+          } catch (ServiceException se) {
+            throw ProtobufUtil.getRemoteException(se);
+          }
+        }
+      };
+    return rpcCallerFactory.<Boolean> 
newCaller(rpcTimeout).callWithRetries(callable,
+        this.operationTimeout);
   }
 
   /**
@@ -723,42 +784,57 @@ public class HTable implements Table {
       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
       final Put put)
   throws IOException {
-    RegionServerCallable<Boolean> callable = new 
RegionServerCallable<Boolean>(this.connection,
-        this.rpcControllerFactory, getName(), row) {
-      @Override
-      protected Boolean call(PayloadCarryingRpcController controller) throws 
Exception {
-        CompareType compareType = CompareType.valueOf(compareOp.name());
-        MutateRequest request = RequestConverter.buildMutateRequest(
-          getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
-          new BinaryComparator(value), compareType, put);
-        MutateResponse response = getStub().mutate(controller, request);
-        return Boolean.valueOf(response.getProcessed());
-      }
-    };
-    return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).
-        callWithRetries(callable, this.operationTimeout);
+    RegionServerCallable<Boolean> callable =
+      new RegionServerCallable<Boolean>(connection, getName(), row) {
+        @Override
+        public Boolean call(int callTimeout) throws IOException {
+          PayloadCarryingRpcController controller = new 
PayloadCarryingRpcController();
+          controller.setPriority(tableName);
+          controller.setCallTimeout(callTimeout);
+          try {
+            CompareType compareType = CompareType.valueOf(compareOp.name());
+            MutateRequest request = RequestConverter.buildMutateRequest(
+              getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
+                new BinaryComparator(value), compareType, put);
+            MutateResponse response = getStub().mutate(controller, request);
+            return Boolean.valueOf(response.getProcessed());
+          } catch (ServiceException se) {
+            throw ProtobufUtil.getRemoteException(se);
+          }
+        }
+      };
+    return rpcCallerFactory.<Boolean> 
newCaller(rpcTimeout).callWithRetries(callable,
+        this.operationTimeout);
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public boolean checkAndDelete(final byte [] row, final byte [] family, final 
byte [] qualifier,
-      final byte [] value, final Delete delete)
+  public boolean checkAndDelete(final byte [] row,
+      final byte [] family, final byte [] qualifier, final byte [] value,
+      final Delete delete)
   throws IOException {
-    RegionServerCallable<Boolean> callable = new 
RegionServerCallable<Boolean>(this.connection,
-        this.rpcControllerFactory, getName(), row) {
-      @Override
-      protected Boolean call(PayloadCarryingRpcController controller) throws 
Exception {
-        MutateRequest request = RequestConverter.buildMutateRequest(
-          getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
-          new BinaryComparator(value), CompareType.EQUAL, delete);
-        MutateResponse response = getStub().mutate(controller, request);
-        return Boolean.valueOf(response.getProcessed());
-      }
-    };
-    return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).
-        callWithRetries(callable, this.operationTimeout);
+    RegionServerCallable<Boolean> callable =
+      new RegionServerCallable<Boolean>(connection, getName(), row) {
+        @Override
+        public Boolean call(int callTimeout) throws IOException {
+          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+          controller.setPriority(tableName);
+          controller.setCallTimeout(callTimeout);
+          try {
+            MutateRequest request = RequestConverter.buildMutateRequest(
+              getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
+                new BinaryComparator(value), CompareType.EQUAL, delete);
+            MutateResponse response = getStub().mutate(controller, request);
+            return Boolean.valueOf(response.getProcessed());
+          } catch (ServiceException se) {
+            throw ProtobufUtil.getRemoteException(se);
+          }
+        }
+      };
+    return rpcCallerFactory.<Boolean> 
newCaller(rpcTimeout).callWithRetries(callable,
+        this.operationTimeout);
   }
 
   /**
@@ -769,18 +845,25 @@ public class HTable implements Table {
       final byte [] qualifier, final CompareOp compareOp, final byte [] value,
       final Delete delete)
   throws IOException {
-    RegionServerCallable<Boolean> callable = new 
RegionServerCallable<Boolean>(this.connection,
-        this.rpcControllerFactory, getName(), row) {
-      @Override
-      protected Boolean call(PayloadCarryingRpcController controller) throws 
Exception {
-        CompareType compareType = CompareType.valueOf(compareOp.name());
-        MutateRequest request = RequestConverter.buildMutateRequest(
-          getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
-          new BinaryComparator(value), compareType, delete);
-        MutateResponse response = getStub().mutate(controller, request);
-        return Boolean.valueOf(response.getProcessed());
-      }
-    };
+    RegionServerCallable<Boolean> callable =
+      new RegionServerCallable<Boolean>(connection, getName(), row) {
+        @Override
+        public Boolean call(int callTimeout) throws IOException {
+          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+          controller.setPriority(tableName);
+          controller.setCallTimeout(callTimeout);
+          try {
+            CompareType compareType = CompareType.valueOf(compareOp.name());
+            MutateRequest request = RequestConverter.buildMutateRequest(
+              getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
+                new BinaryComparator(value), compareType, delete);
+            MutateResponse response = getStub().mutate(controller, request);
+            return Boolean.valueOf(response.getProcessed());
+          } catch (ServiceException se) {
+            throw ProtobufUtil.getRemoteException(se);
+          }
+        }
+      };
     return rpcCallerFactory.<Boolean> 
newCaller(rpcTimeout).callWithRetries(callable,
         this.operationTimeout);
   }
@@ -792,28 +875,40 @@ public class HTable implements Table {
   public boolean checkAndMutate(final byte [] row, final byte [] family, final 
byte [] qualifier,
     final CompareOp compareOp, final byte [] value, final RowMutations rm)
     throws IOException {
+    final RetryingTimeTracker tracker = new RetryingTimeTracker();
     PayloadCarryingServerCallable<MultiResponse> callable =
       new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), 
rm.getRow(),
         rpcControllerFactory) {
         @Override
-        protected MultiResponse call(PayloadCarryingRpcController controller) 
throws Exception {
-          CompareType compareType = CompareType.valueOf(compareOp.name());
-          MultiRequest request = RequestConverter.buildMutateRequest(
-            getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
-            new BinaryComparator(value), compareType, rm);
-          ClientProtos.MultiResponse response = getStub().multi(controller, 
request);
-          ClientProtos.RegionActionResult res = 
response.getRegionActionResultList().get(0);
-          if (res.hasException()) {
-            Throwable ex = ProtobufUtil.toException(res.getException());
-            if (ex instanceof IOException) {
-              throw (IOException)ex;
+        public MultiResponse call(int callTimeout) throws IOException {
+          tracker.start();
+          controller.setPriority(tableName);
+          int remainingTime = tracker.getRemainingTime(callTimeout);
+          if (remainingTime == 0) {
+            throw new DoNotRetryIOException("Timeout for mutate row");
+          }
+          controller.setCallTimeout(remainingTime);
+          try {
+            CompareType compareType = CompareType.valueOf(compareOp.name());
+            MultiRequest request = RequestConverter.buildMutateRequest(
+              getLocation().getRegionInfo().getRegionName(), row, family, 
qualifier,
+              new BinaryComparator(value), compareType, rm);
+            ClientProtos.MultiResponse response = getStub().multi(controller, 
request);
+            ClientProtos.RegionActionResult res = 
response.getRegionActionResultList().get(0);
+            if (res.hasException()) {
+              Throwable ex = ProtobufUtil.toException(res.getException());
+              if(ex instanceof IOException) {
+                throw (IOException)ex;
+              }
+              throw new IOException("Failed to checkAndMutate row: "+
+                                    Bytes.toStringBinary(rm.getRow()), ex);
             }
-            throw new IOException("Failed to checkAndMutate row: "+ 
Bytes.toStringBinary(rm.getRow()), ex);
+            return ResponseConverter.getResults(request, response, 
controller.cellScanner());
+          } catch (ServiceException se) {
+            throw ProtobufUtil.getRemoteException(se);
           }
-          return ResponseConverter.getResults(request, response, 
controller.cellScanner());
         }
       };
-
     /**
      *  Currently, we use one array to store 'processed' flag which is 
returned by server.
      *  It is excessive to send such a large array, but that is required by 
the framework right now
@@ -873,6 +968,7 @@ public class HTable implements Table {
   }
 
   /**
+   * {@inheritDoc}
    * @throws IOException
    */
   void flushCommits() throws IOException {
@@ -1049,18 +1145,19 @@ public class HTable implements Table {
     for (final byte[] r : keys) {
       final RegionCoprocessorRpcChannel channel =
           new RegionCoprocessorRpcChannel(connection, tableName, r);
-      Future<R> future = pool.submit(new Callable<R>() {
-        @Override
-        public R call() throws Exception {
-          T instance = ProtobufUtil.newServiceStub(service, channel);
-          R result = callable.call(instance);
-          byte[] region = channel.getLastRegion();
-          if (callback != null) {
-            callback.update(region, r, result);
-          }
-          return result;
-        }
-      });
+      Future<R> future = pool.submit(
+          new Callable<R>() {
+            @Override
+            public R call() throws Exception {
+              T instance = ProtobufUtil.newServiceStub(service, channel);
+              R result = callable.call(instance);
+              byte[] region = channel.getLastRegion();
+              if (callback != null) {
+                callback.update(region, r, result);
+              }
+              return result;
+            }
+          });
       futures.put(r, future);
     }
     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
@@ -1113,6 +1210,9 @@ public class HTable implements Table {
     return tableName + ";" + connection;
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public <R extends Message> Map<byte[], R> batchCoprocessorService(
       Descriptors.MethodDescriptor methodDescriptor, Message request,
@@ -1121,13 +1221,14 @@ public class HTable implements Table {
         Bytes.BYTES_COMPARATOR));
     batchCoprocessorService(methodDescriptor, request, startKey, endKey, 
responsePrototype,
         new Callback<R>() {
-      @Override
-      public void update(byte[] region, byte[] row, R result) {
-        if (region != null) {
-          results.put(region, result);
-        }
-      }
-    });
+
+          @Override
+          public void update(byte[] region, byte[] row, R result) {
+            if (region != null) {
+              results.put(region, result);
+            }
+          }
+        });
     return results;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
index ae62255..66d3c21 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
@@ -21,24 +21,16 @@ package org.apache.hadoop.hbase.client;
 import java.io.Closeable;
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-
 /**
  * A RetryingCallable for master operations.
  * @param <V> return type
  */
-// Like RegionServerCallable
 abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
   protected ClusterConnection connection;
   protected MasterKeepAliveConnection master;
-  private final PayloadCarryingRpcController rpcController;
 
-  MasterCallable(final Connection connection,
-      final RpcControllerFactory rpcConnectionFactory) {
+  public MasterCallable(final Connection connection) {
     this.connection = (ClusterConnection) connection;
-    this.rpcController = rpcConnectionFactory.newController();
   }
 
   @Override
@@ -67,31 +59,4 @@ abstract class MasterCallable<V> implements 
RetryingCallable<V>, Closeable {
   public long sleep(long pause, int tries) {
     return ConnectionUtils.getPauseTime(pause, tries);
   }
-
-  /**
-   * Override that changes Exception from {@link Exception} to {@link 
IOException}. It also does
-   * setup of an rpcController and calls through to the unimplemented
-   * call(PayloadCarryingRpcController) method; implement this method to add 
your rpc invocation.
-   */
-  @Override
-  // Same trick as in RegionServerCallable so users don't have to copy/paste 
so much boilerplate
-  // and so we contain references to protobuf. We can't set priority on the 
rpcController as
-  // we do in RegionServerCallable because we don't always have a Table when 
we call.
-  public V call(int callTimeout) throws IOException {
-    try {
-      this.rpcController.setCallTimeout(callTimeout);
-      return call(this.rpcController);
-    } catch (Exception e) {
-      throw ProtobufUtil.handleRemoteException(e);
-    }
-  }
-
-  /**
-   * Run RPC call.
-   * @param rpcController PayloadCarryingRpcController is a mouthful but it at 
a minimum is a
-   * facade on protobuf so we don't have to put protobuf everywhere; we can 
keep it behind this
-   * class.
-   * @throws Exception
-   */
-  protected abstract V call(PayloadCarryingRpcController rpcController) throws 
Exception;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
index 47693f4..e445b78 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
@@ -33,7 +33,8 @@ import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
  * against the master on the MasterProtos.MasterService.BlockingInterface; but 
not by
  * final user code. Hence it's package protected.
  */
-interface MasterKeepAliveConnection extends 
MasterProtos.MasterService.BlockingInterface {
+interface MasterKeepAliveConnection
+extends MasterProtos.MasterService.BlockingInterface {
   // Do this instead of implement Closeable because closeable returning IOE is 
PITA.
   void close();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index a3162f4..e764ceb 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -30,9 +30,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -42,14 +41,14 @@ import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
 
 /**
  * Callable that handles the <code>multi</code> method call going against a 
single
- * regionserver; i.e. A RegionServerCallable for the multi call (It is NOT a
- * RegionServerCallable that goes against multiple regions).
+ * regionserver; i.e. A {@link RegionServerCallable} for the multi call (It is 
not a
+ * {@link RegionServerCallable} that goes against multiple regions.
  * @param <R>
  */
-@InterfaceAudience.Private
 class MultiServerCallable<R> extends 
PayloadCarryingServerCallable<MultiResponse> {
   private final MultiAction<R> multiAction;
   private final boolean cellBlock;
@@ -80,7 +79,7 @@ class MultiServerCallable<R> extends 
PayloadCarryingServerCallable<MultiResponse
   }
 
   @Override
-  protected MultiResponse call(PayloadCarryingRpcController controller) throws 
Exception {
+  public MultiResponse call(int callTimeout) throws IOException {
     int countOfActions = this.multiAction.size();
     if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
     MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
@@ -99,8 +98,10 @@ class MultiServerCallable<R> extends 
PayloadCarryingServerCallable<MultiResponse
       regionActionBuilder.clear();
       regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
           HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, 
regionName));
+
+
       if (this.cellBlock) {
-        // Pre-size. Presume at least a KV per Action.  There are likely more.
+        // Presize.  Presume at least a KV per Action.  There are likely more.
         if (cells == null) cells = new 
ArrayList<CellScannable>(countOfActions);
         // Send data in cellblocks. The call to buildNoDataMultiRequest will 
skip RowMutations.
         // They have already been handled above. Guess at count of cells
@@ -115,18 +116,18 @@ class MultiServerCallable<R> extends 
PayloadCarryingServerCallable<MultiResponse
 
     // Controller optionally carries cell data over the proxy/service boundary 
and also
     // optionally ferries cell response data back out again.
-    PayloadCarryingRpcController payloadCarryingRpcController = null;
-    if (cells != null) {
-      // Cast. Will fail if we have been passed wrong RpcController type.
-      payloadCarryingRpcController = (PayloadCarryingRpcController)controller;
-      
payloadCarryingRpcController.setCellScanner(CellUtil.createCellScanner(cells));
-    }
+    if (cells != null) 
controller.setCellScanner(CellUtil.createCellScanner(cells));
+    controller.setPriority(getTableName());
+    controller.setCallTimeout(callTimeout);
     ClientProtos.MultiResponse responseProto;
     ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
-    responseProto = getStub().multi(controller, requestProto);
+    try {
+      responseProto = getStub().multi(controller, requestProto);
+    } catch (ServiceException e) {
+      throw ProtobufUtil.getRemoteException(e);
+    }
     if (responseProto == null) return null; // Occurs on cancel
-    return ResponseConverter.getResults(requestProto, responseProto,
-        payloadCarryingRpcController ==  null? null: 
payloadCarryingRpcController.cellScanner());
+    return ResponseConverter.getResults(requestProto, responseProto, 
controller.cellScanner());
   }
 
   /**
@@ -150,4 +151,4 @@ class MultiServerCallable<R> extends 
PayloadCarryingServerCallable<MultiResponse
   ServerName getServerName() {
     return location.getServerName();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
index 83d857b..d94f069 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
@@ -16,51 +16,33 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 
 /**
- * This class is used to unify HTable calls with AsyncProcess Framework. 
HTable can use
- * AsyncProcess directly though this class. Also adds global timeout tracking 
on top of
- * RegionServerCallable and implements Cancellable.
+ * This class is used to unify HTable calls with AsyncProcess Framework.
+ * HTable can use AsyncProcess directly though this class.
  */
 @InterfaceAudience.Private
-abstract class PayloadCarryingServerCallable<T> extends RegionServerCallable<T>
-    implements Cancellable {
-  private final RetryingTimeTracker tracker = new RetryingTimeTracker();
-
-  PayloadCarryingServerCallable(Connection connection, TableName tableName, 
byte[] row,
-      RpcControllerFactory rpcControllerFactory) {
-    super(connection, rpcControllerFactory, tableName, row);
-  }
+public abstract class PayloadCarryingServerCallable<T>
+    extends RegionServerCallable<T> implements Cancellable {
+  protected PayloadCarryingRpcController controller;
 
-  /* Override so can mess with the callTimeout.
-   * (non-Javadoc)
-   * @see org.apache.hadoop.hbase.client.RegionServerCallable#rpcCall(int)
-   */
-  @Override
-  public T call(int callTimeout) throws IOException {
-    // It is expected (it seems) that tracker.start can be called multiple 
times (on each trip
-    // through the call when retrying). Also, we can call start and no need of 
a stop.
-    this.tracker.start();
-    int remainingTime = tracker.getRemainingTime(callTimeout);
-    if (remainingTime == 0) {
-      throw new DoNotRetryIOException("Timeout for mutate row");
-    }
-    return super.call(remainingTime);
+  public PayloadCarryingServerCallable(Connection connection, TableName 
tableName, byte[] row,
+    RpcControllerFactory rpcControllerFactory) {
+    super(connection, tableName, row);
+    this.controller = rpcControllerFactory.newController();
   }
 
   @Override
   public void cancel() {
-    getRpcController().startCancel();
+    controller.startCancel();
   }
 
   @Override
   public boolean isCancelled() {
-    return getRpcController().isCanceled();
+    return controller.isCanceled();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
index 4e347dd..54c93a0 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
@@ -27,30 +27,31 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Similar to RegionServerCallable but for the AdminService interface. This 
service callable
+ * Similar to {@link RegionServerCallable} but for the AdminService interface. 
This service callable
  * assumes a Table and row and thus does region locating similar to 
RegionServerCallable.
- * Works against Admin stub rather than Client stub.
  */
 
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD",
   justification="stub used by ipc")
 @InterfaceAudience.Private
 public abstract class RegionAdminServiceCallable<T> implements 
RetryingCallable<T> {
-  protected AdminService.BlockingInterface stub;
-  protected final RpcControllerFactory rpcControllerFactory;
-  private PayloadCarryingRpcController controller = null;
 
   protected final ClusterConnection connection;
+
+  protected final RpcControllerFactory rpcControllerFactory;
+
+  protected AdminService.BlockingInterface stub;
+
   protected HRegionLocation location;
+
   protected final TableName tableName;
   protected final byte[] row;
   protected final int replicaId;
+
   protected final static int MIN_WAIT_DEAD_SERVER = 10000;
 
   public RegionAdminServiceCallable(ClusterConnection connection,
@@ -81,13 +82,16 @@ public abstract class RegionAdminServiceCallable<T> 
implements RetryingCallable<
     if (Thread.interrupted()) {
       throw new InterruptedIOException();
     }
+
     if (reload || location == null) {
       location = getLocation(!reload);
     }
+
     if (location == null) {
       // With this exception, there will be a retry.
       throw new HBaseIOException(getExceptionMessage());
     }
+
     this.setStub(connection.getAdmin(location.getServerName()));
   }
 
@@ -163,39 +167,7 @@ public abstract class RegionAdminServiceCallable<T> 
implements RetryingCallable<
     if (rl == null) {
       throw new RetriesExhaustedException("Can't get the locations");
     }
-    return rl;
-  }
-
-  /**
-   * Override that changes Exception from {@link Exception} to {@link 
IOException}. It also does
-   * setup of an rpcController and calls through to the unimplemented
-   * call(PayloadCarryingRpcController) method; implement this method to add 
your rpc invocation.
-   */
-  @Override
-  // Same trick as in RegionServerCallable so users don't have to copy/paste 
so much boilerplate
-  // and so we contain references to protobuf. We can't set priority on the 
rpcController as
-  // we do in RegionServerCallable because we don't always have a Table when 
we call.
-  public T call(int callTimeout) throws IOException {
-    this.controller = rpcControllerFactory.newController();
-    this.controller.setPriority(this.tableName);
-    this.controller.setCallTimeout(callTimeout);
-    try {
-      return call(this.controller);
-    } catch (Exception e) {
-      throw ProtobufUtil.handleRemoteException(e);
-    }
-  }
 
-  PayloadCarryingRpcController getCurrentPayloadCarryingRpcController() {
-    return this.controller;
+    return rl;
   }
-
-  /**
-   * Run RPC call.
-   * @param rpcController PayloadCarryingRpcController is a mouthful but it at 
a minimum is a
-   * facade on protobuf so we don't have to put protobuf everywhere; we can 
keep it behind this
-   * class.
-   * @throws Exception
-   */
-  protected abstract T call(PayloadCarryingRpcController rpcController) throws 
Exception;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
index 861b375..d878bae 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
@@ -1,4 +1,5 @@
 /**
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -23,20 +24,12 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 
 /**
- * Implementations make an rpc call against a RegionService via a protobuf 
Service.
- * Implement #rpcCall(RpcController) and then call {@link #call(int)} to
- * trigger the rpc. The {@link #call(int)} eventually invokes your
- * #rpcCall(RpcController) meanwhile saving you having to write a bunch of
- * boilerplate. The {@link #call(int)} implementation is from {@link 
RpcRetryingCaller} so rpcs are
- * retried on fail.
- *
- * <p>TODO: this class is actually tied to one region, because most of the 
paths make use of
+ * Implementations call a RegionServer and implement {@link #call(int)}.
+ * Passed to a {@link RpcRetryingCaller} so we retry on fail.
+ * TODO: this class is actually tied to one region, because most of the paths 
make use of
  *       the regioninfo part of location when building requests. The only 
reason it works for
  *       multi-region requests (e.g. batch) is that they happen to not use the 
region parts.
  *       This could be done cleaner (e.g. having a generic parameter and 2 
derived classes,
@@ -44,27 +37,18 @@ import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
  * @param <T> the class that the ServerCallable handles
  */
 @InterfaceAudience.Private
-public abstract class RegionServerCallable<T> extends 
AbstractRegionServerCallable<T> {
+public abstract class RegionServerCallable<T> extends 
AbstractRegionServerCallable<T> implements
+    RetryingCallable<T> {
+
   private ClientService.BlockingInterface stub;
-  private final PayloadCarryingRpcController rpcController;
 
   /**
    * @param connection Connection to use.
    * @param tableName Table name to which <code>row</code> belongs.
    * @param row The row we want in <code>tableName</code>.
    */
-  public RegionServerCallable(Connection connection, RpcControllerFactory 
rpcControllerFactory,
-      TableName tableName, byte [] row) {
-    this(connection, rpcControllerFactory.newController(), tableName, row);
-  }
-
-  public RegionServerCallable(Connection connection, 
PayloadCarryingRpcController rpcController,
-      TableName tableName, byte [] row) {
+  public RegionServerCallable(Connection connection, TableName tableName, byte 
[] row) {
     super(connection, tableName, row);
-    this.rpcController = rpcController;
-    if (this.rpcController != null) {
-      this.rpcController.setPriority(tableName);
-    }
   }
 
   void setClientByServiceName(ServerName service) throws IOException {
@@ -85,42 +69,4 @@ public abstract class RegionServerCallable<T> extends 
AbstractRegionServerCallab
   void setStub(final ClientService.BlockingInterface stub) {
     this.stub = stub;
   }
-
-  /**
-   * Override that changes Exception from {@link Exception} to {@link 
IOException}. It also does
-   * setup of an rpcController and calls through to the unimplemented
-   * call(PayloadCarryingRpcController) method; implement this method to add 
your rpc invocation.
-   */
-  @Override
-  public T call(int callTimeout) throws IOException {
-    if (this.rpcController != null) {
-      this.rpcController.setCallTimeout(callTimeout);
-    }
-    try {
-      return call(this.rpcController);
-    } catch (Exception e) {
-      throw ProtobufUtil.handleRemoteException(e);
-    }
-  }
-
-  /**
-   * Run RPC call.
-   * @param rpcController PayloadCarryingRpcController is a mouthful but it at 
a minimum is a
-   * facade on protobuf so we don't have to put protobuf everywhere; we can 
keep it behind this
-   * class.
-   * @throws Exception
-   */
-  protected abstract T call(PayloadCarryingRpcController rpcController) throws 
Exception;
-
-  public PayloadCarryingRpcController getRpcController() {
-    return this.rpcController;
-  }
-
-  long getNonceGroup() {
-    return getConnection().getNonceGenerator().getNonceGroup();
-  }
-
-  long getNewNonce() {
-    return getConnection().getNonceGenerator().newNonce();
-  }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
index b9438e6..24288e6 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  * Tracks the amount of time remaining for an operation.
  */
 class RetryingTimeTracker {
+
   private long globalStartTime = -1;
 
   public void start() {
@@ -37,19 +38,16 @@ class RetryingTimeTracker {
       if (callTimeout == Integer.MAX_VALUE) {
         return Integer.MAX_VALUE;
       }
-      long remaining = EnvironmentEdgeManager.currentTime() - 
this.globalStartTime;
-      long remainingTime = callTimeout - remaining;
+      int remainingTime = (int) (
+        callTimeout -
+        (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
       if (remainingTime < 1) {
         // If there is no time left, we're trying anyway. It's too late.
         // 0 means no timeout, and it's not the intent here. So we secure both 
cases by
         // resetting to the minimum.
         remainingTime = 1;
       }
-      if (remainingTime > Integer.MAX_VALUE) {
-        throw new RuntimeException("remainingTime=" + remainingTime +
-            " which is > Integer.MAX_VALUE");
-      }
-      return (int)remainingTime;
+      return remainingTime;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
index 644337d..0c2d345 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
@@ -176,9 +176,9 @@ public class ReversedScannerCallable extends 
ScannerCallable {
 
   @Override
   public ScannerCallable getScannerCallableForReplica(int id) {
-    ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), 
this.tableName,
+    ReversedScannerCallable r = new ReversedScannerCallable(this.cConnection, 
this.tableName,
         this.getScan(), this.scanMetrics, this.locateStartRow, 
controllerFactory, id);
     r.setCaching(this.getCaching());
     return r;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
deleted file mode 100644
index 68a4aa2..0000000
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.client;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-
-/**
- * A RetryingCallable for RPC connection operations.
- * @param <V> return type
- */
-abstract class RpcRetryingCallable<V> implements RetryingCallable<V>, 
Closeable {
-  @Override
-  public void prepare(boolean reload) throws IOException {
-  }
-
-  @Override
-  public void close() throws IOException {
-  }
-
-  @Override
-  public void throwable(Throwable t, boolean retrying) {
-  }
-
-  @Override
-  public String getExceptionMessageAdditionalDetail() {
-    return "";
-  }
-
-  @Override
-  public long sleep(long pause, int tries) {
-    return ConnectionUtils.getPauseTime(pause, tries);
-  }
-
-  @Override
-  // Same trick as in RegionServerCallable so users don't have to copy/paste 
so much boilerplate
-  // and so we contain references to protobuf.
-  public V call(int callTimeout) throws IOException {
-    try {
-      return rpcCall(callTimeout);
-    } catch (Exception e) {
-      throw ProtobufUtil.handleRemoteException(e);
-    }
-  }
-
-  protected abstract V rpcCall(int callTimeout) throws Exception;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
index 2b2e4c8..b4cd2ef 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
@@ -22,6 +22,9 @@ import 
org.apache.hadoop.hbase.classification.InterfaceStability;
 
 import java.io.IOException;
 
+/**
+ *
+ */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public interface RpcRetryingCaller<T> {
@@ -49,4 +52,4 @@ public interface RpcRetryingCaller<T> {
    */
   T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
   throws IOException, RuntimeException;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
index f92aeae..1c723c5 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
@@ -36,7 +36,6 @@ public class RpcRetryingCallerFactory {
   private final int rpcTimeout;
   private final RetryingCallerInterceptor interceptor;
   private final int startLogErrorsCnt;
-  /* These below data members are UNUSED!!!*/
   private final boolean enableBackPressure;
   private ServerStatisticTracker stats;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 2785648..65dbb10 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -29,6 +29,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
@@ -44,6 +46,8 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
+import com.google.protobuf.ServiceException;
+
 
 /**
  * Caller that goes to replica if the primary region does no answer within a 
configurable
@@ -53,6 +57,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  */
 @InterfaceAudience.Private
 public class RpcRetryingCallerWithReadReplicas {
+  private static final Log LOG = 
LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
+
   protected final ExecutorService pool;
   protected final ClusterConnection cConnection;
   protected final Configuration conf;
@@ -92,7 +98,7 @@ public class RpcRetryingCallerWithReadReplicas {
     private final PayloadCarryingRpcController controller;
 
     public ReplicaRegionServerCallable(int id, HRegionLocation location) {
-      super(RpcRetryingCallerWithReadReplicas.this.cConnection, 
rpcControllerFactory,
+      super(RpcRetryingCallerWithReadReplicas.this.cConnection,
           RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
       this.id = id;
       this.location = location;
@@ -135,20 +141,28 @@ public class RpcRetryingCallerWithReadReplicas {
     }
 
     @Override
-    protected Result call(PayloadCarryingRpcController controller) throws 
Exception {
+    public Result call(int callTimeout) throws Exception {
       if (controller.isCanceled()) return null;
+
       if (Thread.interrupted()) {
         throw new InterruptedIOException();
       }
+
       byte[] reg = location.getRegionInfo().getRegionName();
+
       ClientProtos.GetRequest request =
           RequestConverter.buildGetRequest(reg, get);
       controller.setCallTimeout(callTimeout);
-      ClientProtos.GetResponse response = getStub().get(controller, request);
-      if (response == null) {
-        return null;
+
+      try {
+        ClientProtos.GetResponse response = getStub().get(controller, request);
+        if (response == null) {
+          return null;
+        }
+        return ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
       }
-      return ProtobufUtil.toResult(response.getResult(), 
controller.cellScanner());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 1689d11..72d69ec 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -52,6 +52,9 @@ import 
org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
 
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
+
 /**
  * Scanner operations such as create, next, etc.
  * Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying 
caller such as
@@ -71,6 +74,7 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
   protected boolean renew = false;
   private Scan scan;
   private int caching = 1;
+  protected final ClusterConnection cConnection;
   protected ScanMetrics scanMetrics;
   private boolean logScannerActivity = false;
   private int logCutOffLatency = 1000;
@@ -121,8 +125,9 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
    */
   public ScannerCallable(ClusterConnection connection, TableName tableName, 
Scan scan,
       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int 
id) {
-    super(connection, rpcControllerFactory, tableName, scan.getStartRow());
+    super(connection, tableName, scan.getStartRow());
     this.id = id;
+    this.cConnection = connection;
     this.scan = scan;
     this.scanMetrics = scanMetrics;
     Configuration conf = connection.getConfiguration();
@@ -180,16 +185,25 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
     }
   }
 
-  protected Result [] call(PayloadCarryingRpcController controller) throws 
Exception {
+
+  @Override
+  public Result [] call(int callTimeout) throws IOException {
     if (Thread.interrupted()) {
       throw new InterruptedIOException();
     }
-    if (this.closed) {
-      if (this.scannerId != -1) {
+
+    if (controller == null) {
+      controller = controllerFactory.newController();
+      controller.setPriority(getTableName());
+      controller.setCallTimeout(callTimeout);
+    }
+
+    if (closed) {
+      if (scannerId != -1) {
         close();
       }
     } else {
-      if (this.scannerId == -1L) {
+      if (scannerId == -1L) {
         this.scannerId = openScanner();
       } else {
         Result [] rrs = null;
@@ -198,56 +212,61 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
         setHeartbeatMessage(false);
         try {
           incRPCcallsMetrics();
-          request = RequestConverter.buildScanRequest(scannerId, caching, 
false, nextCallSeq,
+          request =
+              RequestConverter.buildScanRequest(scannerId, caching, false, 
nextCallSeq,
                 this.scanMetrics != null, renew);
           ScanResponse response = null;
-          response = getStub().scan(controller, request);
-          // Client and RS maintain a nextCallSeq number during the scan. 
Every next() call
-          // from client to server will increment this number in both sides. 
Client passes this
-          // number along with the request and at RS side both the incoming 
nextCallSeq and its
-          // nextCallSeq will be matched. In case of a timeout this increment 
at the client side
-          // should not happen. If at the server side fetching of next batch 
of data was over,
-          // there will be mismatch in the nextCallSeq number. Server will 
throw
-          // OutOfOrderScannerNextException and then client will reopen the 
scanner with startrow
-          // as the last successfully retrieved row.
-          // See HBASE-5974
-          nextCallSeq++;
-          long timestamp = System.currentTimeMillis();
-          setHeartbeatMessage(response.hasHeartbeatMessage() && 
response.getHeartbeatMessage());
-          // Results are returned via controller
-          CellScanner cellScanner = controller.cellScanner();
-          rrs = ResponseConverter.getResults(cellScanner, response);
-          if (logScannerActivity) {
-            long now = System.currentTimeMillis();
-            if (now - timestamp > logCutOffLatency) {
-              int rows = rrs == null ? 0 : rrs.length;
-              LOG.info("Took " + (now-timestamp) + "ms to fetch "
+          try {
+            response = getStub().scan(controller, request);
+            // Client and RS maintain a nextCallSeq number during the scan. 
Every next() call
+            // from client to server will increment this number in both sides. 
Client passes this
+            // number along with the request and at RS side both the incoming 
nextCallSeq and its
+            // nextCallSeq will be matched. In case of a timeout this 
increment at the client side
+            // should not happen. If at the server side fetching of next batch 
of data was over,
+            // there will be mismatch in the nextCallSeq number. Server will 
throw
+            // OutOfOrderScannerNextException and then client will reopen the 
scanner with startrow
+            // as the last successfully retrieved row.
+            // See HBASE-5974
+            nextCallSeq++;
+            long timestamp = System.currentTimeMillis();
+            setHeartbeatMessage(response.hasHeartbeatMessage() && 
response.getHeartbeatMessage());
+            // Results are returned via controller
+            CellScanner cellScanner = controller.cellScanner();
+            rrs = ResponseConverter.getResults(cellScanner, response);
+            if (logScannerActivity) {
+              long now = System.currentTimeMillis();
+              if (now - timestamp > logCutOffLatency) {
+                int rows = rrs == null ? 0 : rrs.length;
+                LOG.info("Took " + (now-timestamp) + "ms to fetch "
                   + rows + " rows from scanner=" + scannerId);
+              }
             }
-          }
-          updateServerSideMetrics(response);
-          // moreResults is only used for the case where a filter exhausts all 
elements
-          if (response.hasMoreResults() && !response.getMoreResults()) {
-            this.scannerId = -1L;
-            this.closed = true;
-            // Implied that no results were returned back, either.
-            return null;
-          }
-          // moreResultsInRegion explicitly defines when a RS may choose to 
terminate a batch due
-          // to size or quantity of results in the response.
-          if (response.hasMoreResultsInRegion()) {
-            // Set what the RS said
-            setHasMoreResultsContext(true);
-            setServerHasMoreResults(response.getMoreResultsInRegion());
-          } else {
-            // Server didn't respond whether it has more results or not.
-            setHasMoreResultsContext(false);
+            updateServerSideMetrics(response);
+            // moreResults is only used for the case where a filter exhausts 
all elements
+            if (response.hasMoreResults() && !response.getMoreResults()) {
+              scannerId = -1L;
+              closed = true;
+              // Implied that no results were returned back, either.
+              return null;
+            }
+            // moreResultsInRegion explicitly defines when a RS may choose to 
terminate a batch due
+            // to size or quantity of results in the response.
+            if (response.hasMoreResultsInRegion()) {
+              // Set what the RS said
+              setHasMoreResultsContext(true);
+              setServerHasMoreResults(response.getMoreResultsInRegion());
+            } else {
+              // Server didn't respond whether it has more results or not.
+              setHasMoreResultsContext(false);
+            }
+          } catch (ServiceException se) {
+            throw ProtobufUtil.getRemoteException(se);
           }
           updateResultsMetrics(rrs);
         } catch (IOException e) {
           if (logScannerActivity) {
-            LOG.info("Got exception making request " + 
ProtobufUtil.toText(request) + " to " +
-                getLocation(), e);
+            LOG.info("Got exception making request " + 
TextFormat.shortDebugString(request)
+              + " to " + getLocation(), e);
           }
           IOException ioe = e;
           if (e instanceof RemoteException) {
@@ -256,9 +275,9 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
             try {
               HRegionLocation location =
-                  getConnection().relocateRegion(getTableName(), 
scan.getStartRow());
-              LOG.info("Scanner=" + scannerId + " expired, current region 
location is " +
-                  location.toString());
+                getConnection().relocateRegion(getTableName(), 
scan.getStartRow());
+              LOG.info("Scanner=" + scannerId
+                + " expired, current region location is " + 
location.toString());
             } catch (Throwable t) {
               LOG.info("Failed to relocate region", t);
             }
@@ -357,8 +376,8 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
           RequestConverter.buildScanRequest(this.scannerId, 0, true, 
this.scanMetrics != null);
       try {
         getStub().scan(controller, request);
-      } catch (Exception e) {
-        throw ProtobufUtil.handleRemoteException(e);
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
       }
     } catch (IOException e) {
       LOG.warn("Ignore, probably already closed", e);
@@ -368,8 +387,10 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
 
   protected long openScanner() throws IOException {
     incRPCcallsMetrics();
-    ScanRequest request = RequestConverter.buildScanRequest(
-        getLocation().getRegionInfo().getRegionName(), this.scan, 0, false);
+    ScanRequest request =
+      RequestConverter.buildScanRequest(
+        getLocation().getRegionInfo().getRegionName(),
+        this.scan, 0, false);
     try {
       ScanResponse response = getStub().scan(controller, request);
       long id = response.getScannerId();
@@ -378,8 +399,8 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
           + " on region " + getLocation().toString());
       }
       return id;
-    } catch (Exception e) {
-      throw ProtobufUtil.handleRemoteException(e);
+    } catch (ServiceException se) {
+      throw ProtobufUtil.getRemoteException(se);
     }
   }
 
@@ -422,6 +443,11 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
     return caching;
   }
 
+  @Override
+  public ClusterConnection getConnection() {
+    return cConnection;
+  }
+
   /**
    * Set the number of rows that will be fetched on next
    * @param caching the number of rows for caching
@@ -462,4 +488,4 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
   protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) 
{
     this.serverHasMoreResultsContext = serverHasMoreResultsContext;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index d6896e1..7b1547d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -22,9 +22,6 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -41,35 +38,41 @@ import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.security.token.Token;
 
+import com.google.protobuf.ServiceException;
+
 /**
  * Client proxy for SecureBulkLoadProtocol
  */
 @InterfaceAudience.Private
 public class SecureBulkLoadClient {
   private Table table;
-  private final RpcControllerFactory rpcControllerFactory;
 
-  public SecureBulkLoadClient(final Configuration conf, Table table) {
+  public SecureBulkLoadClient(Table table) {
     this.table = table;
-    this.rpcControllerFactory = new RpcControllerFactory(conf);
   }
 
   public String prepareBulkLoad(final Connection conn) throws IOException {
     try {
-      RegionServerCallable<String> callable = new 
RegionServerCallable<String>(conn,
-          this.rpcControllerFactory, table.getName(), 
HConstants.EMPTY_START_ROW) {
-        @Override
-        protected String call(PayloadCarryingRpcController controller) throws 
Exception {
-          byte[] regionName = getLocation().getRegionInfo().getRegionName();
-          RegionSpecifier region =
-              
RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 
regionName);
-          PrepareBulkLoadRequest request = PrepareBulkLoadRequest.newBuilder()
-              .setTableName(ProtobufUtil.toProtoTableName(table.getName()))
-              .setRegion(region).build();
-          PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, 
request);
-          return response.getBulkToken();
-        }
-      };
+      RegionServerCallable<String> callable =
+          new RegionServerCallable<String>(conn, table.getName(), 
HConstants.EMPTY_START_ROW) {
+            @Override
+            public String call(int callTimeout) throws IOException {
+              byte[] regionName = 
getLocation().getRegionInfo().getRegionName();
+              RegionSpecifier region =
+                  RequestConverter
+                      .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 
regionName);
+              try {
+                PrepareBulkLoadRequest request =
+                    PrepareBulkLoadRequest.newBuilder()
+                        
.setTableName(ProtobufUtil.toProtoTableName(table.getName()))
+                        .setRegion(region).build();
+                PrepareBulkLoadResponse response = 
getStub().prepareBulkLoad(null, request);
+                return response.getBulkToken();
+              } catch (ServiceException se) {
+                throw ProtobufUtil.getRemoteException(se);
+              }
+            }
+          };
       return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), 
null)
           .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
     } catch (Throwable throwable) {
@@ -79,19 +82,24 @@ public class SecureBulkLoadClient {
 
   public void cleanupBulkLoad(final Connection conn, final String bulkToken) 
throws IOException {
     try {
-      RegionServerCallable<Void> callable = new 
RegionServerCallable<Void>(conn,
-          this.rpcControllerFactory, table.getName(), 
HConstants.EMPTY_START_ROW) {
-        @Override
-        protected Void call(PayloadCarryingRpcController controller) throws 
Exception {
-          byte[] regionName = getLocation().getRegionInfo().getRegionName();
-          RegionSpecifier region = RequestConverter.buildRegionSpecifier(
-              RegionSpecifierType.REGION_NAME, regionName);
-          CleanupBulkLoadRequest request =
-              
CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bulkToken).build();
-          getStub().cleanupBulkLoad(null, request);
-          return null;
-        }
-      };
+      RegionServerCallable<Void> callable =
+          new RegionServerCallable<Void>(conn, table.getName(), 
HConstants.EMPTY_START_ROW) {
+            @Override
+            public Void call(int callTimeout) throws IOException {
+              byte[] regionName = 
getLocation().getRegionInfo().getRegionName();
+              RegionSpecifier region = RequestConverter.buildRegionSpecifier(
+                RegionSpecifierType.REGION_NAME, regionName);
+              try {
+                CleanupBulkLoadRequest request =
+                    CleanupBulkLoadRequest.newBuilder().setRegion(region)
+                        .setBulkToken(bulkToken).build();
+                getStub().cleanupBulkLoad(null, request);
+              } catch (ServiceException se) {
+                throw ProtobufUtil.getRemoteException(se);
+              }
+              return null;
+            }
+          };
       RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
           .<Void> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
     } catch (Throwable throwable) {
@@ -122,12 +130,12 @@ public class SecureBulkLoadClient {
     try {
       BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
       return response.getLoaded();
-    } catch (Exception se) {
-      throw ProtobufUtil.handleRemoteException(se);
+    } catch (ServiceException se) {
+      throw ProtobufUtil.getRemoteException(se);
     }
   }
 
   public Path getStagingPath(String bulkToken, byte[] family) throws 
IOException {
     return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), 
bulkToken, family);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
index a6384e3..6fae5cb 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
@@ -77,4 +77,5 @@ public class MasterCoprocessorRpcChannel extends 
SyncCoprocessorRpcChannel {
     }
     return response;
   }
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
index 6c290a6..f4f18b3 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
@@ -17,39 +17,24 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
 
 /**
  * Optionally carries Cells across the proxy/service interface down into ipc. 
On its
- * way out it optionally carries a set of result Cell data. We stick the Cells 
here when we want
- * to avoid having to protobuf them (for performance reasons). This class is 
used ferrying data
- * across the proxy/protobuf service chasm. Also does call timeout. Used by 
client and server
- * ipc'ing.
+ * way out it optionally carries a set of result Cell data.  We stick the 
Cells here when we want
+ * to avoid having to protobuf them.  This class is used ferrying data across 
the proxy/protobuf
+ * service chasm.  Used by client and server ipc'ing.
  */
 @InterfaceAudience.Private
-public class PayloadCarryingRpcController implements RpcController, 
CellScannable {
-  /**
-   * The time, in ms before the call should expire.
-   */
-  protected volatile Integer callTimeout;
-  protected volatile boolean cancelled = false;
-  protected final AtomicReference<RpcCallback<Object>> cancellationCb = new 
AtomicReference<>(null);
-  protected final AtomicReference<RpcCallback<IOException>> failureCb = new 
AtomicReference<>(null);
-  private IOException exception;
+public class PayloadCarryingRpcController
+    extends TimeLimitedRpcController implements CellScannable {
 
   public static final int PRIORITY_UNSET = -1;
   /**
@@ -108,123 +93,15 @@ public class PayloadCarryingRpcController implements 
RpcController, CellScannabl
   }
 
   /**
-   * @param regionName RegionName. If hbase:meta, we'll set high priority.
-   */
-  public void setPriority(final byte [] regionName) {
-    if (isMetaRegion(regionName)) {
-      setPriority(TableName.META_TABLE_NAME);
-    }
-  }
-
-  private static boolean isMetaRegion(final byte[] regionName) {
-    return Bytes.equals(regionName, 
HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
-        || Bytes.equals(regionName, 
HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
-  }
-
-  /**
    * @return The priority of this request
    */
   public int getPriority() {
     return priority;
   }
 
-  @Override
-  public void reset() {
+  @Override public void reset() {
+    super.reset();
     priority = 0;
     cellScanner = null;
-    exception = null;
-    cancelled = false;
-    failureCb.set(null);
-    cancellationCb.set(null);
-    callTimeout = null;
-  }
-
-  public int getCallTimeout() {
-    if (callTimeout != null) {
-      return callTimeout;
-    } else {
-      return 0;
-    }
-  }
-
-  public void setCallTimeout(int callTimeout) {
-    this.callTimeout = callTimeout;
-  }
-
-  public boolean hasCallTimeout(){
-    return callTimeout != null;
-  }
-
-  @Override
-  public String errorText() {
-    if (exception != null) {
-      return exception.getMessage();
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * For use in async rpc clients
-   * @return true if failed
-   */
-  @Override
-  public boolean failed() {
-    return this.exception != null;
-  }
-
-  @Override
-  public boolean isCanceled() {
-    return cancelled;
-  }
-
-  @Override
-  public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
-    this.cancellationCb.set(cancellationCb);
-    if (this.cancelled) {
-      cancellationCb.run(null);
-    }
-  }
-
-  /**
-   * Notify a callback on error.
-   * For use in async rpc clients
-   *
-   * @param failureCb the callback to call on error
-   */
-  public void notifyOnFail(RpcCallback<IOException> failureCb) {
-    this.failureCb.set(failureCb);
-    if (this.exception != null) {
-      failureCb.run(this.exception);
-    }
-  }
-
-  @Override
-  public void setFailed(String reason) {
-    this.exception = new IOException(reason);
-    if (this.failureCb.get() != null) {
-      this.failureCb.get().run(this.exception);
-    }
-  }
-
-  /**
-   * Set failed with an exception to pass on.
-   * For use in async rpc clients
-   *
-   * @param e exception to set with
-   */
-  public void setFailed(IOException e) {
-    this.exception = e;
-    if (this.failureCb.get() != null) {
-      this.failureCb.get().run(this.exception);
-    }
-  }
-
-  @Override
-  public void startCancel() {
-    cancelled = true;
-    if (cancellationCb.get() != null) {
-      cancellationCb.get().run(null);
-    }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
index dbc9041..55d6375 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
@@ -76,23 +76,30 @@ public class RegionCoprocessorRpcChannel extends 
SyncCoprocessorRpcChannel {
       Descriptors.MethodDescriptor method, Message request, Message 
responsePrototype)
           throws IOException {
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Call: " + method.getName() + ", " + request.toString());
+      LOG.trace("Call: "+method.getName()+", "+request.toString());
     }
+
     if (row == null) {
       throw new IllegalArgumentException("Missing row property for remote 
region location");
     }
+
+    final RpcController rpcController = controller == null
+        ? rpcControllerFactory.newController() : controller;
+
     final ClientProtos.CoprocessorServiceCall call =
         CoprocessorRpcUtils.buildServiceCall(row, method, request);
     RegionServerCallable<CoprocessorServiceResponse> callable =
-        new RegionServerCallable<CoprocessorServiceResponse>(connection,
-          controller == null? this.rpcControllerFactory.newController():
-            (PayloadCarryingRpcController)controller,
-          table, row) {
+        new RegionServerCallable<CoprocessorServiceResponse>(connection, 
table, row) {
       @Override
-      protected CoprocessorServiceResponse call(PayloadCarryingRpcController 
controller)
-      throws Exception {
+      public CoprocessorServiceResponse call(int callTimeout) throws Exception 
{
+        if (rpcController instanceof PayloadCarryingRpcController) {
+          ((PayloadCarryingRpcController) 
rpcController).setPriority(tableName);
+        }
+        if (rpcController instanceof TimeLimitedRpcController) {
+          ((TimeLimitedRpcController) 
rpcController).setCallTimeout(callTimeout);
+        }
         byte[] regionName = getLocation().getRegionInfo().getRegionName();
-        return ProtobufUtil.execService(getRpcController(), getStub(), call, 
regionName);
+        return ProtobufUtil.execService(rpcController, getStub(), call, 
regionName);
       }
     };
     CoprocessorServiceResponse result = 
rpcCallerFactory.<CoprocessorServiceResponse> newCaller()

Reply via email to