http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index fa18bd8..29650ef 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -18,6 +18,10 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -28,7 +32,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -66,6 +69,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.security.SecurityCapability;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel;
@@ -179,8 +183,6 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.KeeperException;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * HBaseAdmin is no longer a client API. It is marked 
InterfaceAudience.Private indicating that
  * this is an HBase-internal class as defined in
@@ -209,6 +211,10 @@ public class HBaseAdmin implements Admin {
   private volatile Configuration conf;
   private final long pause;
   private final int numRetries;
+  // Some operations can take a long time such as disable of big table.
+  // numRetries is for 'normal' stuff... Multiply by this factor when
+  // want to wait a long time.
+  private final int retryLongerMultiplier;
   private final int syncWaitTimeout;
   private boolean aborted;
   private int operationTimeout;
@@ -233,6 +239,8 @@ public class HBaseAdmin implements Admin {
         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+    this.retryLongerMultiplier = this.conf.getInt(
+        "hbase.client.retries.longer.multiplier", 10);
     this.operationTimeout = 
this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
     this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
@@ -254,7 +262,7 @@ public class HBaseAdmin implements Admin {
   }
 
   @Override
-  public boolean isAborted() {
+  public boolean isAborted(){
     return this.aborted;
   }
 
@@ -266,16 +274,18 @@ public class HBaseAdmin implements Admin {
   }
 
   @Override
-  public Future<Boolean> abortProcedureAsync(final long procId, final boolean 
mayInterruptIfRunning)
-  throws IOException {
+  public Future<Boolean> abortProcedureAsync(
+      final long procId,
+      final boolean mayInterruptIfRunning) throws IOException {
     Boolean abortProcResponse = executeCallable(
-      new MasterCallable<AbortProcedureResponse>(getConnection(), 
getRpcControllerFactory()) {
+      new MasterCallable<AbortProcedureResponse>(getConnection()) {
         @Override
-        protected AbortProcedureResponse call(PayloadCarryingRpcController 
rpcController)
-        throws Exception {
+        public AbortProcedureResponse call(int callTimeout) throws 
ServiceException {
+          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+          controller.setCallTimeout(callTimeout);
           AbortProcedureRequest abortProcRequest =
               AbortProcedureRequest.newBuilder().setProcId(procId).build();
-          return master.abortProcedure(rpcController, abortProcRequest);
+          return master.abortProcedure(controller, abortProcRequest);
         }
       }).getIsProcedureAborted();
 
@@ -314,9 +324,9 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public boolean tableExists(final TableName tableName) throws IOException {
-    return executeCallable(new RpcRetryingCallable<Boolean>() {
+    return executeCallable(new ConnectionCallable<Boolean>(getConnection()) {
       @Override
-      protected Boolean rpcCall(int callTimeout) throws Exception {
+      public Boolean call(int callTimeout) throws ServiceException, 
IOException {
         return MetaTableAccessor.tableExists(connection, tableName);
       }
     });
@@ -340,15 +350,14 @@ public class HBaseAdmin implements Admin {
   @Override
   public HTableDescriptor[] listTables(final Pattern pattern, final boolean 
includeSysTables)
       throws IOException {
-    return executeCallable(new 
MasterCallable<HTableDescriptor[]>(getConnection(),
-        getRpcControllerFactory()) {
+    return executeCallable(new 
MasterCallable<HTableDescriptor[]>(getConnection()) {
       @Override
-      protected HTableDescriptor[] call(PayloadCarryingRpcController 
rpcController)
-      throws Exception {
+      public HTableDescriptor[] call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
         GetTableDescriptorsRequest req =
             RequestConverter.buildGetTableDescriptorsRequest(pattern, 
includeSysTables);
-        return 
ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(rpcController,
-            req));
+        return 
ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, 
req));
       }
     });
   }
@@ -377,13 +386,14 @@ public class HBaseAdmin implements Admin {
   @Override
   public TableName[] listTableNames(final Pattern pattern, final boolean 
includeSysTables)
       throws IOException {
-    return executeCallable(new MasterCallable<TableName[]>(getConnection(),
-        getRpcControllerFactory()) {
+    return executeCallable(new MasterCallable<TableName[]>(getConnection()) {
       @Override
-      protected TableName[] call(PayloadCarryingRpcController rpcController) 
throws Exception {
+      public TableName[] call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
         GetTableNamesRequest req =
             RequestConverter.buildGetTableNamesRequest(pattern, 
includeSysTables);
-        return 
ProtobufUtil.getTableNameArray(master.getTableNames(rpcController, req)
+        return ProtobufUtil.getTableNameArray(master.getTableNames(controller, 
req)
             .getTableNamesList());
       }
     });
@@ -404,25 +414,27 @@ public class HBaseAdmin implements Admin {
   static HTableDescriptor getTableDescriptor(final TableName tableName, 
Connection connection,
       RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory 
rpcControllerFactory,
       int operationTimeout, int rpcTimeout) throws IOException {
-    if (tableName == null) return null;
-    HTableDescriptor htd =
-        executeCallable(new MasterCallable<HTableDescriptor>(connection, 
rpcControllerFactory) {
-      @Override
-      protected HTableDescriptor call(PayloadCarryingRpcController 
rpcController)
-      throws Exception {
-        GetTableDescriptorsRequest req =
-            RequestConverter.buildGetTableDescriptorsRequest(tableName);
-        GetTableDescriptorsResponse htds = 
master.getTableDescriptors(rpcController, req);
-        if (!htds.getTableSchemaList().isEmpty()) {
-          return 
ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0));
+      if (tableName == null) return null;
+      HTableDescriptor htd = executeCallable(new 
MasterCallable<HTableDescriptor>(connection) {
+        @Override
+        public HTableDescriptor call(int callTimeout) throws ServiceException {
+          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+          controller.setCallTimeout(callTimeout);
+          GetTableDescriptorsResponse htds;
+          GetTableDescriptorsRequest req =
+                  RequestConverter.buildGetTableDescriptorsRequest(tableName);
+          htds = master.getTableDescriptors(controller, req);
+
+          if (!htds.getTableSchemaList().isEmpty()) {
+            return 
ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0));
+          }
+          return null;
         }
-        return null;
+      }, rpcCallerFactory, operationTimeout, rpcTimeout);
+      if (htd != null) {
+        return htd;
       }
-    }, rpcCallerFactory, operationTimeout, rpcTimeout);
-    if (htd != null) {
-      return htd;
-    }
-    throw new TableNotFoundException(tableName.getNameAsString());
+      throw new TableNotFoundException(tableName.getNameAsString());
   }
 
   private long getPauseTime(int tries) {
@@ -490,14 +502,15 @@ public class HBaseAdmin implements Admin {
     }
 
     CreateTableResponse response = executeCallable(
-      new MasterCallable<CreateTableResponse>(getConnection(), 
getRpcControllerFactory()) {
+      new MasterCallable<CreateTableResponse>(getConnection()) {
         @Override
-        protected CreateTableResponse call(PayloadCarryingRpcController 
rpcController)
-        throws Exception {
-          rpcController.setPriority(desc.getTableName());
+        public CreateTableResponse call(int callTimeout) throws 
ServiceException {
+          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+          controller.setCallTimeout(callTimeout);
+          controller.setPriority(desc.getTableName());
           CreateTableRequest request = 
RequestConverter.buildCreateTableRequest(
             desc, splitKeys, ng.getNonceGroup(), ng.newNonce());
-          return master.createTable(rpcController, request);
+          return master.createTable(controller, request);
         }
       });
     return new CreateTableFuture(this, desc, splitKeys, response);
@@ -541,14 +554,15 @@ public class HBaseAdmin implements Admin {
   @Override
   public Future<Void> deleteTableAsync(final TableName tableName) throws 
IOException {
     DeleteTableResponse response = executeCallable(
-      new MasterCallable<DeleteTableResponse>(getConnection(), 
getRpcControllerFactory()) {
+      new MasterCallable<DeleteTableResponse>(getConnection()) {
         @Override
-        protected DeleteTableResponse call(PayloadCarryingRpcController 
rpcController)
-        throws Exception {
-          rpcController.setPriority(tableName);
+        public DeleteTableResponse call(int callTimeout) throws 
ServiceException {
+          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+          controller.setCallTimeout(callTimeout);
+          controller.setPriority(tableName);
           DeleteTableRequest req =
               RequestConverter.buildDeleteTableRequest(tableName, 
ng.getNonceGroup(),ng.newNonce());
-          return master.deleteTable(rpcController,req);
+          return master.deleteTable(controller,req);
         }
       });
     return new DeleteTableFuture(this, tableName, response);
@@ -622,16 +636,16 @@ public class HBaseAdmin implements Admin {
   public Future<Void> truncateTableAsync(final TableName tableName, final 
boolean preserveSplits)
       throws IOException {
     TruncateTableResponse response =
-        executeCallable(new 
MasterCallable<TruncateTableResponse>(getConnection(),
-            getRpcControllerFactory()) {
+        executeCallable(new 
MasterCallable<TruncateTableResponse>(getConnection()) {
           @Override
-          protected TruncateTableResponse call(PayloadCarryingRpcController 
rpcController)
-          throws Exception {
-            rpcController.setPriority(tableName);
+          public TruncateTableResponse call(int callTimeout) throws 
ServiceException {
+            PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+            controller.setCallTimeout(callTimeout);
+            controller.setPriority(tableName);
             LOG.info("Started truncating " + tableName);
             TruncateTableRequest req = 
RequestConverter.buildTruncateTableRequest(
               tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce());
-            return master.truncateTable(rpcController, req);
+            return master.truncateTable(controller, req);
           }
         });
     return new TruncateTableFuture(this, tableName, preserveSplits, response);
@@ -687,15 +701,17 @@ public class HBaseAdmin implements Admin {
   public Future<Void> enableTableAsync(final TableName tableName) throws 
IOException {
     TableName.isLegalFullyQualifiedTableName(tableName.getName());
     EnableTableResponse response = executeCallable(
-      new MasterCallable<EnableTableResponse>(getConnection(), 
getRpcControllerFactory()) {
+      new MasterCallable<EnableTableResponse>(getConnection()) {
         @Override
-        protected EnableTableResponse call(PayloadCarryingRpcController 
rpcController)
-        throws Exception {
-          rpcController.setPriority(tableName);
+        public EnableTableResponse call(int callTimeout) throws 
ServiceException {
+          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+          controller.setCallTimeout(callTimeout);
+          controller.setPriority(tableName);
+
           LOG.info("Started enable of " + tableName);
           EnableTableRequest req =
               RequestConverter.buildEnableTableRequest(tableName, 
ng.getNonceGroup(),ng.newNonce());
-          return master.enableTable(rpcController,req);
+          return master.enableTable(controller,req);
         }
       });
     return new EnableTableFuture(this, tableName, response);
@@ -751,16 +767,18 @@ public class HBaseAdmin implements Admin {
   public Future<Void> disableTableAsync(final TableName tableName) throws 
IOException {
     TableName.isLegalFullyQualifiedTableName(tableName.getName());
     DisableTableResponse response = executeCallable(
-      new MasterCallable<DisableTableResponse>(getConnection(), 
getRpcControllerFactory()) {
+      new MasterCallable<DisableTableResponse>(getConnection()) {
         @Override
-        protected DisableTableResponse call(PayloadCarryingRpcController 
rpcController)
-        throws Exception {
-          rpcController.setPriority(tableName);
+        public DisableTableResponse call(int callTimeout) throws 
ServiceException {
+          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+          controller.setCallTimeout(callTimeout);
+          controller.setPriority(tableName);
+
           LOG.info("Started disable of " + tableName);
           DisableTableRequest req =
               RequestConverter.buildDisableTableRequest(
                 tableName, ng.getNonceGroup(), ng.newNonce());
-          return master.disableTable(rpcController, req);
+          return master.disableTable(controller, req);
         }
       });
     return new DisableTableFuture(this, tableName, response);
@@ -809,9 +827,9 @@ public class HBaseAdmin implements Admin {
   @Override
   public boolean isTableEnabled(final TableName tableName) throws IOException {
     checkTableExists(tableName);
-    return executeCallable(new RpcRetryingCallable<Boolean>() {
+    return executeCallable(new ConnectionCallable<Boolean>(getConnection()) {
       @Override
-      protected Boolean rpcCall(int callTimeout) throws Exception {
+      public Boolean call(int callTimeout) throws ServiceException, 
IOException {
         TableState tableState = MetaTableAccessor.getTableState(connection, 
tableName);
         if (tableState == null)
           throw new TableNotFoundException(tableName);
@@ -838,15 +856,16 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public Pair<Integer, Integer> getAlterStatus(final TableName tableName) 
throws IOException {
-    return executeCallable(new MasterCallable<Pair<Integer, 
Integer>>(getConnection(),
-        getRpcControllerFactory()) {
+    return executeCallable(new MasterCallable<Pair<Integer, 
Integer>>(getConnection()) {
       @Override
-      protected Pair<Integer, Integer> call(PayloadCarryingRpcController 
rpcController)
-      throws Exception {
-        rpcController.setPriority(tableName);
+      public Pair<Integer, Integer> call(int callTimeout) throws 
ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+        controller.setPriority(tableName);
+
         GetSchemaAlterStatusRequest req = RequestConverter
             .buildGetSchemaAlterStatusRequest(tableName);
-        GetSchemaAlterStatusResponse ret = 
master.getSchemaAlterStatus(rpcController, req);
+        GetSchemaAlterStatusResponse ret = 
master.getSchemaAlterStatus(controller, req);
         Pair<Integer, Integer> pair = new Pair<>(ret.getYetToUpdateRegions(),
             ret.getTotalRegions());
         return pair;
@@ -875,16 +894,17 @@ public class HBaseAdmin implements Admin {
   public Future<Void> addColumnFamily(final TableName tableName,
       final HColumnDescriptor columnFamily) throws IOException {
     AddColumnResponse response =
-        executeCallable(new MasterCallable<AddColumnResponse>(getConnection(),
-            getRpcControllerFactory()) {
+        executeCallable(new MasterCallable<AddColumnResponse>(getConnection()) 
{
           @Override
-          protected AddColumnResponse call(PayloadCarryingRpcController 
rpcController)
-          throws Exception {
-            rpcController.setPriority(tableName);
+          public AddColumnResponse call(int callTimeout) throws 
ServiceException {
+            PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+            controller.setCallTimeout(callTimeout);
+            controller.setPriority(tableName);
+
             AddColumnRequest req =
                 RequestConverter.buildAddColumnRequest(tableName, 
columnFamily, ng.getNonceGroup(),
                   ng.newNonce());
-            return master.addColumn(rpcController, req);
+            return master.addColumn(controller, req);
           }
         });
     return new AddColumnFamilyFuture(this, tableName, response);
@@ -919,16 +939,17 @@ public class HBaseAdmin implements Admin {
   public Future<Void> deleteColumnFamily(final TableName tableName, final 
byte[] columnFamily)
       throws IOException {
     DeleteColumnResponse response =
-        executeCallable(new 
MasterCallable<DeleteColumnResponse>(getConnection(),
-            getRpcControllerFactory()) {
+        executeCallable(new 
MasterCallable<DeleteColumnResponse>(getConnection()) {
           @Override
-          protected DeleteColumnResponse call(PayloadCarryingRpcController 
rpcController)
-          throws Exception {
-            rpcController.setPriority(tableName);
+          public DeleteColumnResponse call(int callTimeout) throws 
ServiceException {
+            PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+            controller.setCallTimeout(callTimeout);
+            controller.setPriority(tableName);
+
             DeleteColumnRequest req =
                 RequestConverter.buildDeleteColumnRequest(tableName, 
columnFamily,
                   ng.getNonceGroup(), ng.newNonce());
-            master.deleteColumn(rpcController, req);
+            master.deleteColumn(controller, req);
             return null;
           }
         });
@@ -964,16 +985,17 @@ public class HBaseAdmin implements Admin {
   public Future<Void> modifyColumnFamily(final TableName tableName,
       final HColumnDescriptor columnFamily) throws IOException {
     ModifyColumnResponse response =
-        executeCallable(new 
MasterCallable<ModifyColumnResponse>(getConnection(),
-            getRpcControllerFactory()) {
+        executeCallable(new 
MasterCallable<ModifyColumnResponse>(getConnection()) {
           @Override
-          protected ModifyColumnResponse call(PayloadCarryingRpcController 
rpcController)
-          throws Exception {
-            rpcController.setPriority(tableName);
+          public ModifyColumnResponse call(int callTimeout) throws 
ServiceException {
+            PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+            controller.setCallTimeout(callTimeout);
+            controller.setPriority(tableName);
+
             ModifyColumnRequest req =
                 RequestConverter.buildModifyColumnRequest(tableName, 
columnFamily,
                   ng.getNonceGroup(), ng.newNonce());
-            master.modifyColumn(rpcController, req);
+            master.modifyColumn(controller, req);
             return null;
           }
         });
@@ -1022,26 +1044,28 @@ public class HBaseAdmin implements Admin {
   @Override
   public boolean closeRegionWithEncodedRegionName(final String 
encodedRegionName,
       final String serverName) throws IOException {
-   if (null == serverName || ("").equals(serverName.trim())) {
-      throw new IllegalArgumentException("The servername cannot be null or 
empty.");
+    if (null == serverName || ("").equals(serverName.trim())) {
+      throw new IllegalArgumentException(
+          "The servername cannot be null or empty.");
     }
-    final ServerName sn = ServerName.valueOf(serverName);
-    final AdminService.BlockingInterface admin = connection.getAdmin(sn);
-    final PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
-    return executeCallable(new RpcRetryingCallable<Boolean>() {
-      @Override
-      protected Boolean rpcCall(int callTimeout) throws Exception {
-        controller.setCallTimeout(callTimeout);
-        CloseRegionRequest request =
-            RequestConverter.buildCloseRegionRequest(sn, encodedRegionName);
-        CloseRegionResponse response = admin.closeRegion(controller, request);
-        boolean closed = response.getClosed();
-        if (false == closed) {
-          LOG.error("Not able to close the region " + encodedRegionName + ".");
-        }
-        return closed;
+    ServerName sn = ServerName.valueOf(serverName);
+    AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+    // Close the region without updating zk state.
+    CloseRegionRequest request =
+      RequestConverter.buildCloseRegionRequest(sn, encodedRegionName);
+    try {
+      PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+
+      // TODO: this does not do retries, it should. Set priority and timeout 
in controller
+      CloseRegionResponse response = admin.closeRegion(controller, request);
+      boolean isRegionClosed = response.getClosed();
+      if (false == isRegionClosed) {
+        LOG.error("Not able to close the region " + encodedRegionName + ".");
       }
-    });
+      return isRegionClosed;
+    } catch (ServiceException se) {
+      throw ProtobufUtil.getRemoteException(se);
+    }
   }
 
   @Override
@@ -1080,20 +1104,20 @@ public class HBaseAdmin implements Admin {
     if (regionServerPair.getSecond() == null) {
       throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
     }
-    final HRegionInfo hRegionInfo = regionServerPair.getFirst();
+    HRegionInfo hRegionInfo = regionServerPair.getFirst();
     ServerName serverName = regionServerPair.getSecond();
-    final PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
-    final AdminService.BlockingInterface admin = 
this.connection.getAdmin(serverName);
-    executeCallable(new RpcRetryingCallable<Void>() {
-      @Override
-      protected Void rpcCall(int callTimeout) throws Exception {
-        controller.setCallTimeout(callTimeout);
-        FlushRegionRequest request =
-            
RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName());
-        admin.flushRegion(controller, request);
-        return null;
-      }
-    });
+
+    PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+
+    AdminService.BlockingInterface admin = 
this.connection.getAdmin(serverName);
+    FlushRegionRequest request =
+        RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName());
+    try {
+      // TODO: this does not do retries, it should. Set priority and timeout 
in controller
+      admin.flushRegion(controller, request);
+    } catch (ServiceException se) {
+      throw ProtobufUtil.getRemoteException(se);
+    }
   }
 
   /**
@@ -1244,45 +1268,67 @@ public class HBaseAdmin implements Admin {
   private void compact(final ServerName sn, final HRegionInfo hri,
       final boolean major, final byte [] family)
   throws IOException {
-    final PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
-    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
-    executeCallable(new RpcRetryingCallable<Void>() {
-      @Override
-      protected Void rpcCall(int callTimeout) throws Exception {
-        controller.setCallTimeout(callTimeout);
-        CompactRegionRequest request =
-            RequestConverter.buildCompactRegionRequest(hri.getRegionName(), 
major, family);
-        admin.compactRegion(controller, request);
-        return null;
-      }
-    });
+    PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+    AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+    CompactRegionRequest request =
+      RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, 
family);
+    try {
+      // TODO: this does not do retries, it should. Set priority and timeout 
in controller
+      admin.compactRegion(controller, request);
+    } catch (ServiceException se) {
+      throw ProtobufUtil.getRemoteException(se);
+    }
   }
 
   @Override
   public void move(final byte [] encodedRegionName, final byte [] 
destServerName)
-  throws IOException {
-    executeCallable(new MasterCallable<Void>(getConnection(), 
getRpcControllerFactory()) {
+      throws IOException {
+
+    executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      protected Void call(PayloadCarryingRpcController rpcController) throws 
Exception {
-        rpcController.setPriority(encodedRegionName);
-        MoveRegionRequest request =
-            RequestConverter.buildMoveRegionRequest(encodedRegionName, 
destServerName);
-        master.moveRegion(rpcController, request);
+      public Void call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+        // Hard to know the table name, at least check if meta
+        if (isMetaRegion(encodedRegionName)) {
+          controller.setPriority(TableName.META_TABLE_NAME);
+        }
+
+        try {
+          MoveRegionRequest request =
+              RequestConverter.buildMoveRegionRequest(encodedRegionName, 
destServerName);
+            master.moveRegion(controller, request);
+        } catch (DeserializationException de) {
+          LOG.error("Could not parse destination server name: " + de);
+          throw new ServiceException(new DoNotRetryIOException(de));
+        }
         return null;
       }
     });
   }
 
+  private boolean isMetaRegion(final byte[] regionName) {
+    return Bytes.equals(regionName, 
HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
+        || Bytes.equals(regionName, 
HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
+  }
+
   @Override
-  public void assign(final byte [] regionName) throws 
MasterNotRunningException,
+  public void assign(final byte[] regionName) throws MasterNotRunningException,
       ZooKeeperConnectionException, IOException {
-    executeCallable(new MasterCallable<Void>(getConnection(), 
getRpcControllerFactory()) {
+    final byte[] toBeAssigned = getRegionName(regionName);
+    executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      protected Void call(PayloadCarryingRpcController rpcController) throws 
Exception {
-        rpcController.setPriority(regionName);
+      public Void call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+        // Hard to know the table name, at least check if meta
+        if (isMetaRegion(regionName)) {
+          controller.setPriority(TableName.META_TABLE_NAME);
+        }
+
         AssignRegionRequest request =
-            
RequestConverter.buildAssignRegionRequest(getRegionName(regionName));
-        master.assignRegion(rpcController, request);
+          RequestConverter.buildAssignRegionRequest(toBeAssigned);
+        master.assignRegion(controller,request);
         return null;
       }
     });
@@ -1292,13 +1338,18 @@ public class HBaseAdmin implements Admin {
   public void unassign(final byte [] regionName, final boolean force)
   throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
     final byte[] toBeUnassigned = getRegionName(regionName);
-    executeCallable(new MasterCallable<Void>(getConnection(), 
getRpcControllerFactory()) {
+    executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      protected Void call(PayloadCarryingRpcController rpcController) throws 
Exception {
-        rpcController.setPriority(regionName);
+      public Void call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+        // Hard to know the table name, at least check if meta
+        if (isMetaRegion(regionName)) {
+          controller.setPriority(TableName.META_TABLE_NAME);
+        }
         UnassignRegionRequest request =
-            RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
-        master.unassignRegion(rpcController, request);
+          RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force);
+        master.unassignRegion(controller, request);
         return null;
       }
     });
@@ -1307,11 +1358,16 @@ public class HBaseAdmin implements Admin {
   @Override
   public void offline(final byte [] regionName)
   throws IOException {
-    executeCallable(new MasterCallable<Void>(getConnection(), 
getRpcControllerFactory()) {
+    executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      protected Void call(PayloadCarryingRpcController rpcController) throws 
Exception {
-        rpcController.setPriority(regionName);
-        master.offlineRegion(rpcController, 
RequestConverter.buildOfflineRegionRequest(regionName));
+      public Void call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+        // Hard to know the table name, at least check if meta
+        if (isMetaRegion(regionName)) {
+          controller.setPriority(TableName.META_TABLE_NAME);
+        }
+        master.offlineRegion(controller, 
RequestConverter.buildOfflineRegionRequest(regionName));
         return null;
       }
     });
@@ -1320,44 +1376,56 @@ public class HBaseAdmin implements Admin {
   @Override
   public boolean setBalancerRunning(final boolean on, final boolean 
synchronous)
   throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection(), 
getRpcControllerFactory()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
       @Override
-      protected Boolean call(PayloadCarryingRpcController rpcController) 
throws Exception {
+      public Boolean call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+
         SetBalancerRunningRequest req =
             RequestConverter.buildSetBalancerRunningRequest(on, synchronous);
-        return master.setBalancerRunning(rpcController, 
req).getPrevBalanceValue();
+        return master.setBalancerRunning(controller, 
req).getPrevBalanceValue();
       }
     });
   }
 
   @Override
   public boolean balancer() throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection(), 
getRpcControllerFactory()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
       @Override
-      protected Boolean call(PayloadCarryingRpcController rpcController) 
throws Exception {
-        return master.balance(rpcController,
-            RequestConverter.buildBalanceRequest(false)).getBalancerRan();
+      public Boolean call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+
+        return master.balance(controller,
+          RequestConverter.buildBalanceRequest(false)).getBalancerRan();
       }
     });
   }
 
   @Override
   public boolean balancer(final boolean force) throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection(), 
getRpcControllerFactory()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
       @Override
-      protected Boolean call(PayloadCarryingRpcController rpcController) 
throws Exception {
-        return master.balance(rpcController,
-            RequestConverter.buildBalanceRequest(force)).getBalancerRan();
+      public Boolean call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+
+        return master.balance(controller,
+          RequestConverter.buildBalanceRequest(force)).getBalancerRan();
       }
     });
   }
 
   @Override
   public boolean isBalancerEnabled() throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection(), 
getRpcControllerFactory()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
       @Override
-      protected Boolean call(PayloadCarryingRpcController rpcController) 
throws Exception {
-        return master.isBalancerEnabled(rpcController,
+      public Boolean call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+
+        return master.isBalancerEnabled(controller,
           RequestConverter.buildIsBalancerEnabledRequest()).getEnabled();
       }
     });
@@ -1365,10 +1433,13 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public boolean normalize() throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection(), 
getRpcControllerFactory()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
       @Override
-      protected Boolean call(PayloadCarryingRpcController rpcController) 
throws Exception {
-        return master.normalize(rpcController,
+      public Boolean call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+
+        return master.normalize(controller,
           RequestConverter.buildNormalizeRequest()).getNormalizerRan();
       }
     });
@@ -1376,10 +1447,13 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public boolean isNormalizerEnabled() throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection(), 
getRpcControllerFactory()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
       @Override
-      protected Boolean call(PayloadCarryingRpcController rpcController) 
throws Exception {
-        return master.isNormalizerEnabled(rpcController,
+      public Boolean call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+
+        return master.isNormalizerEnabled(controller,
           RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled();
       }
     });
@@ -1387,22 +1461,28 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public boolean setNormalizerRunning(final boolean on) throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection(), 
getRpcControllerFactory()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
       @Override
-      protected Boolean call(PayloadCarryingRpcController rpcController) 
throws Exception {
+      public Boolean call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+
         SetNormalizerRunningRequest req =
           RequestConverter.buildSetNormalizerRunningRequest(on);
-        return master.setNormalizerRunning(rpcController, 
req).getPrevNormalizerValue();
+        return master.setNormalizerRunning(controller, 
req).getPrevNormalizerValue();
       }
     });
   }
 
   @Override
   public boolean enableCatalogJanitor(final boolean enable) throws IOException 
{
-    return executeCallable(new MasterCallable<Boolean>(getConnection(), 
getRpcControllerFactory()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
       @Override
-      protected Boolean call(PayloadCarryingRpcController rpcController) 
throws Exception {
-        return master.enableCatalogJanitor(rpcController,
+      public Boolean call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+
+        return master.enableCatalogJanitor(controller,
           
RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue();
       }
     });
@@ -1410,10 +1490,13 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public int runCatalogScan() throws IOException {
-    return executeCallable(new MasterCallable<Integer>(getConnection(), 
getRpcControllerFactory()) {
+    return executeCallable(new MasterCallable<Integer>(getConnection()) {
       @Override
-      protected Integer call(PayloadCarryingRpcController rpcController) 
throws Exception {
-        return master.runCatalogScan(rpcController,
+      public Integer call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+
+        return master.runCatalogScan(controller,
           RequestConverter.buildCatalogScanRequest()).getScanResult();
       }
     });
@@ -1421,10 +1504,13 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public boolean isCatalogJanitorEnabled() throws IOException {
-    return executeCallable(new MasterCallable<Boolean>(getConnection(), 
getRpcControllerFactory()) {
+    return executeCallable(new MasterCallable<Boolean>(getConnection()) {
       @Override
-      protected Boolean call(PayloadCarryingRpcController rpcController) 
throws Exception {
-        return master.isCatalogJanitorEnabled(rpcController,
+      public Boolean call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+
+        return master.isCatalogJanitorEnabled(controller,
           RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue();
       }
     });
@@ -1530,19 +1616,25 @@ public class HBaseAdmin implements Admin {
     }
 
     DispatchMergingRegionsResponse response =
-        executeCallable(new 
MasterCallable<DispatchMergingRegionsResponse>(getConnection(),
-            getRpcControllerFactory()) {
+      executeCallable(new 
MasterCallable<DispatchMergingRegionsResponse>(getConnection()) {
       @Override
-      protected DispatchMergingRegionsResponse 
call(PayloadCarryingRpcController rpcController)
-          throws Exception {
-        DispatchMergingRegionsRequest request = RequestConverter
-            .buildDispatchMergingRegionsRequest(
+      public DispatchMergingRegionsResponse call(int callTimeout) throws 
ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+
+        try {
+          DispatchMergingRegionsRequest request = RequestConverter
+              .buildDispatchMergingRegionsRequest(
                 encodedNameOfRegionA,
                 encodedNameOfRegionB,
                 forcible,
                 ng.getNonceGroup(),
                 ng.newNonce());
-        return master.dispatchMergingRegions(rpcController, request);
+          return master.dispatchMergingRegions(controller, request);
+        } catch (DeserializationException de) {
+          LOG.error("Could not parse destination server name: " + de);
+          throw new ServiceException(new DoNotRetryIOException(de));
+        }
       }
     });
     return new DispatchMergingRegionsFuture(this, tableName, response);
@@ -1654,17 +1746,21 @@ public class HBaseAdmin implements Admin {
       throw new IllegalArgumentException("the specified table name '" + 
tableName +
         "' doesn't match with the HTD one: " + htd.getTableName());
     }
+
     ModifyTableResponse response = executeCallable(
-      new MasterCallable<ModifyTableResponse>(getConnection(), 
getRpcControllerFactory()) {
+      new MasterCallable<ModifyTableResponse>(getConnection()) {
         @Override
-        protected ModifyTableResponse call(PayloadCarryingRpcController 
rpcController)
-        throws Exception {
-          rpcController.setPriority(tableName);
+        public ModifyTableResponse call(int callTimeout) throws 
ServiceException {
+          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+          controller.setCallTimeout(callTimeout);
+          controller.setPriority(tableName);
+
           ModifyTableRequest request = 
RequestConverter.buildModifyTableRequest(
             tableName, htd, ng.getNonceGroup(), ng.newNonce());
-          return master.modifyTable(rpcController, request);
+          return master.modifyTable(controller, request);
         }
       });
+
     return new ModifyTableFuture(this, tableName, response);
   }
 
@@ -1779,9 +1875,9 @@ public class HBaseAdmin implements Admin {
    */
   private TableName checkTableExists(final TableName tableName)
       throws IOException {
-    return executeCallable(new RpcRetryingCallable<TableName>() {
+    return executeCallable(new ConnectionCallable<TableName>(getConnection()) {
       @Override
-      protected TableName rpcCall(int callTimeout) throws Exception {
+      public TableName call(int callTimeout) throws ServiceException, 
IOException {
         if (!MetaTableAccessor.tableExists(connection, tableName)) {
           throw new TableNotFoundException(tableName);
         }
@@ -1792,11 +1888,13 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public synchronized void shutdown() throws IOException {
-    executeCallable(new MasterCallable<Void>(getConnection(), 
getRpcControllerFactory()) {
+    executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      protected Void call(PayloadCarryingRpcController rpcController) throws 
Exception {
-        rpcController.setPriority(HConstants.HIGH_QOS);
-        master.shutdown(rpcController, ShutdownRequest.newBuilder().build());
+      public Void call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+        controller.setPriority(HConstants.HIGH_QOS);
+        master.shutdown(controller, ShutdownRequest.newBuilder().build());
         return null;
       }
     });
@@ -1804,11 +1902,13 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public synchronized void stopMaster() throws IOException {
-    executeCallable(new MasterCallable<Void>(getConnection(), 
getRpcControllerFactory()) {
+    executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      protected Void call(PayloadCarryingRpcController rpcController) throws 
Exception {
-        rpcController.setPriority(HConstants.HIGH_QOS);
-        master.stopMaster(rpcController, 
StopMasterRequest.newBuilder().build());
+      public Void call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+        controller.setPriority(HConstants.HIGH_QOS);
+        master.stopMaster(controller, StopMasterRequest.newBuilder().build());
         return null;
       }
     });
@@ -1819,41 +1919,43 @@ public class HBaseAdmin implements Admin {
   throws IOException {
     String hostname = Addressing.parseHostname(hostnamePort);
     int port = Addressing.parsePort(hostnamePort);
-    final AdminService.BlockingInterface admin =
+    AdminService.BlockingInterface admin =
       this.connection.getAdmin(ServerName.valueOf(hostname, port, 0));
-    executeCallable(new MasterCallable<Void>(getConnection(), 
getRpcControllerFactory()) {
-      @Override
-      protected Void call(PayloadCarryingRpcController rpcController) throws 
Exception {
-        rpcController.setPriority(HConstants.HIGH_QOS);
-        StopServerRequest request = RequestConverter.buildStopServerRequest(
-            "Called by admin client " + this.connection.toString());
-        admin.stopServer(rpcController, request);
-        return null;
-      }
-    });
+    StopServerRequest request = RequestConverter.buildStopServerRequest(
+      "Called by admin client " + this.connection.toString());
+    PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+
+    controller.setPriority(HConstants.HIGH_QOS);
+    try {
+      // TODO: this does not do retries, it should. Set priority and timeout 
in controller
+      admin.stopServer(controller, request);
+    } catch (ServiceException se) {
+      throw ProtobufUtil.getRemoteException(se);
+    }
   }
 
   @Override
   public boolean isMasterInMaintenanceMode() throws IOException {
-    return executeCallable(new 
MasterCallable<IsInMaintenanceModeResponse>(getConnection(),
-        this.rpcControllerFactory) {
+    return executeCallable(new 
MasterCallable<IsInMaintenanceModeResponse>(getConnection()) {
       @Override
-      protected IsInMaintenanceModeResponse call(PayloadCarryingRpcController 
rpcController)
-      throws Exception {
-        return master.isMasterInMaintenanceMode(rpcController,
-            IsInMaintenanceModeRequest.newBuilder().build());
+      public IsInMaintenanceModeResponse call(int callTimeout) throws 
ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+        return master.isMasterInMaintenanceMode(
+          controller, IsInMaintenanceModeRequest.newBuilder().build());
       }
     }).getInMaintenanceMode();
   }
 
   @Override
   public ClusterStatus getClusterStatus() throws IOException {
-    return executeCallable(new MasterCallable<ClusterStatus>(getConnection(),
-        this.rpcControllerFactory) {
+    return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) {
       @Override
-      protected ClusterStatus call(PayloadCarryingRpcController rpcController) 
throws Exception {
+      public ClusterStatus call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
         GetClusterStatusRequest req = 
RequestConverter.buildGetClusterStatusRequest();
-        return ProtobufUtil.convert(master.getClusterStatus(rpcController, 
req).getClusterStatus());
+        return ProtobufUtil.convert(master.getClusterStatus(controller, 
req).getClusterStatus());
       }
     });
   }
@@ -1894,16 +1996,19 @@ public class HBaseAdmin implements Admin {
   public Future<Void> createNamespaceAsync(final NamespaceDescriptor 
descriptor)
       throws IOException {
     CreateNamespaceResponse response =
-        executeCallable(new 
MasterCallable<CreateNamespaceResponse>(getConnection(),
-            getRpcControllerFactory()) {
-      @Override
-      protected CreateNamespaceResponse call(PayloadCarryingRpcController 
rpcController)
-      throws Exception {
-        return master.createNamespace(rpcController,
-          
CreateNamespaceRequest.newBuilder().setNamespaceDescriptor(ProtobufUtil.
-              toProtoNamespaceDescriptor(descriptor)).build());
-      }
-    });
+        executeCallable(new 
MasterCallable<CreateNamespaceResponse>(getConnection()) {
+          @Override
+          public CreateNamespaceResponse call(int callTimeout) throws 
Exception {
+            PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+            controller.setCallTimeout(callTimeout);
+            // TODO: set priority based on NS?
+            return master.createNamespace(controller,
+              CreateNamespaceRequest.newBuilder()
+              .setNamespaceDescriptor(ProtobufUtil
+                .toProtoNamespaceDescriptor(descriptor)).build()
+                );
+          }
+        });
     return new NamespaceFuture(this, descriptor.getName(), 
response.getProcId()) {
       @Override
       public String getOperationType() {
@@ -1922,16 +2027,16 @@ public class HBaseAdmin implements Admin {
   public Future<Void> modifyNamespaceAsync(final NamespaceDescriptor 
descriptor)
       throws IOException {
     ModifyNamespaceResponse response =
-        executeCallable(new 
MasterCallable<ModifyNamespaceResponse>(getConnection(),
-            getRpcControllerFactory()) {
-      @Override
-      protected ModifyNamespaceResponse call(PayloadCarryingRpcController 
rpcController)
-      throws Exception {
-        // TODO: set priority based on NS?
-        return master.modifyNamespace(rpcController, 
ModifyNamespaceRequest.newBuilder().
-          
setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
-       }
-    });
+        executeCallable(new 
MasterCallable<ModifyNamespaceResponse>(getConnection()) {
+          @Override
+          public ModifyNamespaceResponse call(int callTimeout) throws 
Exception {
+            PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+            controller.setCallTimeout(callTimeout);
+            // TODO: set priority based on NS?
+            return master.modifyNamespace(controller, 
ModifyNamespaceRequest.newBuilder().
+              
setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build());
+          }
+        });
     return new NamespaceFuture(this, descriptor.getName(), 
response.getProcId()) {
       @Override
       public String getOperationType() {
@@ -1950,16 +2055,16 @@ public class HBaseAdmin implements Admin {
   public Future<Void> deleteNamespaceAsync(final String name)
       throws IOException {
     DeleteNamespaceResponse response =
-        executeCallable(new 
MasterCallable<DeleteNamespaceResponse>(getConnection(),
-            getRpcControllerFactory()) {
-      @Override
-      protected DeleteNamespaceResponse call(PayloadCarryingRpcController 
rpcController)
-      throws Exception {
-        // TODO: set priority based on NS?
-        return master.deleteNamespace(rpcController, 
DeleteNamespaceRequest.newBuilder().
-          setNamespaceName(name).build());
-        }
-      });
+        executeCallable(new 
MasterCallable<DeleteNamespaceResponse>(getConnection()) {
+          @Override
+          public DeleteNamespaceResponse call(int callTimeout) throws 
Exception {
+            PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+            controller.setCallTimeout(callTimeout);
+            // TODO: set priority based on NS?
+            return master.deleteNamespace(controller, 
DeleteNamespaceRequest.newBuilder().
+              setNamespaceName(name).build());
+          }
+        });
     return new NamespaceFuture(this, name, response.getProcId()) {
       @Override
       public String getOperationType() {
@@ -1970,94 +2075,100 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public NamespaceDescriptor getNamespaceDescriptor(final String name) throws 
IOException {
-    return executeCallable(new 
MasterCallable<NamespaceDescriptor>(getConnection(),
-        getRpcControllerFactory()) {
-      @Override
-      protected NamespaceDescriptor call(PayloadCarryingRpcController 
rpcController)
-          throws Exception {
-        return ProtobufUtil.toNamespaceDescriptor(
-            master.getNamespaceDescriptor(rpcController, 
GetNamespaceDescriptorRequest.newBuilder().
+    return
+        executeCallable(new 
MasterCallable<NamespaceDescriptor>(getConnection()) {
+          @Override
+          public NamespaceDescriptor call(int callTimeout) throws Exception {
+            PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+            controller.setCallTimeout(callTimeout);
+            return ProtobufUtil.toNamespaceDescriptor(
+              master.getNamespaceDescriptor(controller, 
GetNamespaceDescriptorRequest.newBuilder().
                 setNamespaceName(name).build()).getNamespaceDescriptor());
-      }
-    });
+          }
+        });
   }
 
   @Override
   public NamespaceDescriptor[] listNamespaceDescriptors() throws IOException {
-    return executeCallable(new 
MasterCallable<NamespaceDescriptor[]>(getConnection(),
-        getRpcControllerFactory()) {
-      @Override
-      protected NamespaceDescriptor[] call(PayloadCarryingRpcController 
rpcController)
-          throws Exception {
-        List<HBaseProtos.NamespaceDescriptor> list =
-            master.listNamespaceDescriptors(rpcController,
-              
ListNamespaceDescriptorsRequest.newBuilder().build()).getNamespaceDescriptorList();
-        NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
-        for(int i = 0; i < list.size(); i++) {
-          res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
-        }
-        return res;
-      }
-    });
+    return
+        executeCallable(new 
MasterCallable<NamespaceDescriptor[]>(getConnection()) {
+          @Override
+          public NamespaceDescriptor[] call(int callTimeout) throws Exception {
+            PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+            controller.setCallTimeout(callTimeout);
+            List<HBaseProtos.NamespaceDescriptor> list =
+                master.listNamespaceDescriptors(controller,
+                  ListNamespaceDescriptorsRequest.newBuilder().build())
+                .getNamespaceDescriptorList();
+            NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
+            for(int i = 0; i < list.size(); i++) {
+              res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
+            }
+            return res;
+          }
+        });
   }
 
   @Override
   public ProcedureInfo[] listProcedures() throws IOException {
-    return executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection(),
-        getRpcControllerFactory()) {
-      @Override
-      protected ProcedureInfo[] call(PayloadCarryingRpcController 
rpcController)
-          throws Exception {
-        List<ProcedureProtos.Procedure> procList = master.listProcedures(
-            rpcController, 
ListProceduresRequest.newBuilder().build()).getProcedureList();
-        ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()];
-        for (int i = 0; i < procList.size(); i++) {
-          procInfoList[i] = ProcedureUtil.convert(procList.get(i));
-        }
-        return procInfoList;
-      }
-    });
+    return
+        executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection()) {
+          @Override
+          public ProcedureInfo[] call(int callTimeout) throws Exception {
+            PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+            controller.setCallTimeout(callTimeout);
+            List<ProcedureProtos.Procedure> procList = master.listProcedures(
+              controller, 
ListProceduresRequest.newBuilder().build()).getProcedureList();
+            ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()];
+            for (int i = 0; i < procList.size(); i++) {
+              procInfoList[i] = ProcedureUtil.convert(procList.get(i));
+            }
+            return procInfoList;
+          }
+        });
   }
 
   @Override
   public HTableDescriptor[] listTableDescriptorsByNamespace(final String name) 
throws IOException {
-    return executeCallable(new 
MasterCallable<HTableDescriptor[]>(getConnection(),
-        getRpcControllerFactory()) {
-      @Override
-      protected HTableDescriptor[] call(PayloadCarryingRpcController 
rpcController)
-          throws Exception {
-        List<TableSchema> list =
-            master.listTableDescriptorsByNamespace(rpcController,
-                
ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name)
-                .build()).getTableSchemaList();
-        HTableDescriptor[] res = new HTableDescriptor[list.size()];
-        for(int i=0; i < list.size(); i++) {
-
-          res[i] = ProtobufUtil.convertToHTableDesc(list.get(i));
-        }
-        return res;
-      }
-    });
+    return
+        executeCallable(new 
MasterCallable<HTableDescriptor[]>(getConnection()) {
+          @Override
+          public HTableDescriptor[] call(int callTimeout) throws Exception {
+            PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+            controller.setCallTimeout(callTimeout);
+            List<TableSchema> list =
+                master.listTableDescriptorsByNamespace(controller,
+                  
ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name)
+                  .build()).getTableSchemaList();
+            HTableDescriptor[] res = new HTableDescriptor[list.size()];
+            for(int i=0; i < list.size(); i++) {
+
+              res[i] = ProtobufUtil.convertToHTableDesc(list.get(i));
+            }
+            return res;
+          }
+        });
   }
 
   @Override
   public TableName[] listTableNamesByNamespace(final String name) throws 
IOException {
-    return executeCallable(new MasterCallable<TableName[]>(getConnection(),
-        getRpcControllerFactory()) {
-      @Override
-      protected TableName[] call(PayloadCarryingRpcController rpcController)
-          throws Exception {
-        List<HBaseProtos.TableName> tableNames =
-            master.listTableNamesByNamespace(rpcController, 
ListTableNamesByNamespaceRequest.
+    return
+        executeCallable(new MasterCallable<TableName[]>(getConnection()) {
+          @Override
+          public TableName[] call(int callTimeout) throws Exception {
+            PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+            controller.setCallTimeout(callTimeout);
+            List<HBaseProtos.TableName> tableNames =
+              master.listTableNamesByNamespace(controller, 
ListTableNamesByNamespaceRequest.
                 newBuilder().setNamespaceName(name).build())
-            .getTableNameList();
-        TableName[] result = new TableName[tableNames.size()];
-        for (int i = 0; i < tableNames.size(); i++) {
-          result[i] = ProtobufUtil.toTableName(tableNames.get(i));
-        }
-        return result;
-      }
-    });
+                .getTableNameList();
+            TableName[] result = new TableName[tableNames.size()];
+            for (int i = 0; i < tableNames.size(); i++) {
+              result[i] = ProtobufUtil.toTableName(tableNames.get(i));
+            }
+            return result;
+          }
+        });
   }
 
   /**
@@ -2065,26 +2176,10 @@ public class HBaseAdmin implements Admin {
    * @param conf system configuration
    * @throws MasterNotRunningException if the master is not running
    * @throws ZooKeeperConnectionException if unable to connect to zookeeper
-   * @deprecated since hbase-2.0.0 because throws a ServiceException. We don't 
want to have
-   * protobuf as part of our public API. Use {@link #available(Configuration)}
    */
   // Used by tests and by the Merge tool. Merge tool uses it to figure if 
HBase is up or not.
-  // MOB uses it too.
-  // NOTE: hbase-2.0.0 removes ServiceException from the throw.
-  @Deprecated
   public static void checkHBaseAvailable(Configuration conf)
-  throws MasterNotRunningException, ZooKeeperConnectionException, IOException,
-  com.google.protobuf.ServiceException {
-    available(conf);
-  }
-
-  /**
-   * Is HBase available? Throw an exception if not.
-   * @param conf system configuration
-   * @throws ZooKeeperConnectionException if unable to connect to zookeeper]
-   */
-  public static void available(final Configuration conf)
-  throws ZooKeeperConnectionException, InterruptedIOException {
+  throws MasterNotRunningException, ZooKeeperConnectionException, 
ServiceException, IOException {
     Configuration copyOfConf = HBaseConfiguration.create(conf);
     // We set it to make it fail as soon as possible if HBase is not available
     copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
@@ -2096,6 +2191,7 @@ public class HBaseAdmin implements Admin {
              (ClusterConnection) 
ConnectionFactory.createConnection(copyOfConf);
          ZooKeeperKeepAliveConnection zkw = ((ConnectionImplementation) 
connection).
              getKeepAliveZooKeeperWatcher();) {
+
       // This is NASTY. FIX!!!! Dependent on internal implementation! TODO
       zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, 
false);
       connection.isMasterRunning();
@@ -2135,15 +2231,14 @@ public class HBaseAdmin implements Admin {
   @Override
   public HTableDescriptor[] getTableDescriptorsByTableName(final 
List<TableName> tableNames)
   throws IOException {
-    return executeCallable(new 
MasterCallable<HTableDescriptor[]>(getConnection(),
-        getRpcControllerFactory()) {
+    return executeCallable(new 
MasterCallable<HTableDescriptor[]>(getConnection()) {
       @Override
-      protected HTableDescriptor[] call(PayloadCarryingRpcController 
rpcController)
-      throws Exception {
+      public HTableDescriptor[] call(int callTimeout) throws Exception {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
         GetTableDescriptorsRequest req =
             RequestConverter.buildGetTableDescriptorsRequest(tableNames);
-          return ProtobufUtil.
-              
getHTableDescriptorArray(master.getTableDescriptors(rpcController, req));
+          return 
ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, 
req));
       }
     });
   }
@@ -2181,16 +2276,16 @@ public class HBaseAdmin implements Admin {
 
   private RollWALWriterResponse rollWALWriterImpl(final ServerName sn) throws 
IOException,
       FailedLogCloseException {
-    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
-    Callable<RollWALWriterResponse> callable = new 
Callable<RollWALWriterResponse>() {
-      @Override
-      public RollWALWriterResponse call() throws Exception {
-        RollWALWriterRequest request = 
RequestConverter.buildRollWALWriterRequest();
-        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
-        return admin.rollWALWriter(controller, request);
-      }
-    };
-    return ProtobufUtil.call(callable);
+    AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+    RollWALWriterRequest request = 
RequestConverter.buildRollWALWriterRequest();
+    PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+
+    try {
+      // TODO: this does not do retries, it should. Set priority and timeout 
in controller
+      return admin.rollWALWriter(controller, request);
+    } catch (ServiceException se) {
+      throw ProtobufUtil.getRemoteException(se);
+    }
   }
 
   /**
@@ -2226,7 +2321,8 @@ public class HBaseAdmin implements Admin {
     }
     byte[][] regionsToFlush = new byte[regionCount][];
     for (int i = 0; i < regionCount; i++) {
-      regionsToFlush[i] = ProtobufUtil.toBytes(response.getRegionToFlush(i));
+      ByteString region = response.getRegionToFlush(i);
+      regionsToFlush[i] = region.toByteArray();
     }
     return regionsToFlush;
   }
@@ -2256,31 +2352,28 @@ public class HBaseAdmin implements Admin {
   @Override
   public CompactionState getCompactionStateForRegion(final byte[] regionName)
   throws IOException {
-    final Pair<HRegionInfo, ServerName> regionServerPair = 
getRegion(regionName);
-    if (regionServerPair == null) {
-      throw new IllegalArgumentException("Invalid region: " + 
Bytes.toStringBinary(regionName));
-    }
-    if (regionServerPair.getSecond() == null) {
-      throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
-    }
-    ServerName sn = regionServerPair.getSecond();
-    final AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
-    Callable<CompactionState> callable = new Callable<CompactionState>() {
-      @Override
-      public CompactionState call() throws Exception {
-        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
-        GetRegionInfoRequest request = 
RequestConverter.buildGetRegionInfoRequest(
-            regionServerPair.getFirst().getRegionName(), true);
-
-        // TODO: this does not do retries, it should. Set priority and timeout 
in controller
-        GetRegionInfoResponse response = admin.getRegionInfo(controller, 
request);
-        if (response.getCompactionState() != null) {
-          return 
ProtobufUtil.createCompactionState(response.getCompactionState());
-        }
-        return null;
+    try {
+      Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName);
+      if (regionServerPair == null) {
+        throw new IllegalArgumentException("Invalid region: " + 
Bytes.toStringBinary(regionName));
+      }
+      if (regionServerPair.getSecond() == null) {
+        throw new NoServerForRegionException(Bytes.toStringBinary(regionName));
+      }
+      ServerName sn = regionServerPair.getSecond();
+      AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
+      GetRegionInfoRequest request = 
RequestConverter.buildGetRegionInfoRequest(
+        regionServerPair.getFirst().getRegionName(), true);
+      PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+      // TODO: this does not do retries, it should. Set priority and timeout 
in controller
+      GetRegionInfoResponse response = admin.getRegionInfo(controller, 
request);
+      if (response.getCompactionState() != null) {
+        return 
ProtobufUtil.createCompactionState(response.getCompactionState());
       }
-    };
-    return ProtobufUtil.call(callable);
+      return null;
+    } catch (ServiceException se) {
+      throw ProtobufUtil.getRemoteException(se);
+    }
   }
 
   @Override
@@ -2332,12 +2425,12 @@ public class HBaseAdmin implements Admin {
         throw (InterruptedIOException)new 
InterruptedIOException("Interrupted").initCause(e);
       }
       LOG.debug("Getting current status of snapshot from master...");
-      done = executeCallable(new 
MasterCallable<IsSnapshotDoneResponse>(getConnection(),
-          getRpcControllerFactory()) {
+      done = executeCallable(new 
MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
         @Override
-        protected IsSnapshotDoneResponse call(PayloadCarryingRpcController 
rpcController)
-        throws Exception {
-          return master.isSnapshotDone(rpcController, request);
+        public IsSnapshotDoneResponse call(int callTimeout) throws 
ServiceException {
+          PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+          controller.setCallTimeout(callTimeout);
+          return master.isSnapshotDone(controller, request);
         }
       });
     }
@@ -2383,12 +2476,12 @@ public class HBaseAdmin implements Admin {
     final SnapshotRequest request = 
SnapshotRequest.newBuilder().setSnapshot(snapshot)
         .build();
     // run the snapshot on the master
-    return executeCallable(new 
MasterCallable<SnapshotResponse>(getConnection(),
-        getRpcControllerFactory()) {
+    return executeCallable(new 
MasterCallable<SnapshotResponse>(getConnection()) {
       @Override
-      protected SnapshotResponse call(PayloadCarryingRpcController 
rpcController)
-      throws Exception {
-        return master.snapshot(rpcController, request);
+      public SnapshotResponse call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+        return master.snapshot(controller, request);
       }
     });
   }
@@ -2397,12 +2490,12 @@ public class HBaseAdmin implements Admin {
   public boolean isSnapshotFinished(final SnapshotDescription snapshotDesc)
       throws IOException, HBaseSnapshotException, UnknownSnapshotException {
     final HBaseProtos.SnapshotDescription snapshot = 
createHBaseProtosSnapshotDesc(snapshotDesc);
-    return executeCallable(new 
MasterCallable<IsSnapshotDoneResponse>(getConnection(),
-        getRpcControllerFactory()) {
+    return executeCallable(new 
MasterCallable<IsSnapshotDoneResponse>(getConnection()) {
       @Override
-      protected IsSnapshotDoneResponse call(PayloadCarryingRpcController 
rpcController)
-      throws Exception {
-        return master.isSnapshotDone(rpcController,
+      public IsSnapshotDoneResponse call(int callTimeout) throws 
ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+        return master.isSnapshotDone(controller,
           IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build());
       }
     }).getDone();
@@ -2581,11 +2674,12 @@ public class HBaseAdmin implements Admin {
         .setProcedure(builder.build()).build();
     // run the procedure on the master
     ExecProcedureResponse response = executeCallable(new 
MasterCallable<ExecProcedureResponse>(
-        getConnection(), getRpcControllerFactory()) {
+        getConnection()) {
       @Override
-      protected ExecProcedureResponse call(PayloadCarryingRpcController 
rpcController)
-      throws Exception {
-        return master.execProcedureWithRet(rpcController, request);
+      public ExecProcedureResponse call(int callTimeout) throws 
ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+        return master.execProcedureWithRet(controller, request);
       }
     });
 
@@ -2607,11 +2701,12 @@ public class HBaseAdmin implements Admin {
         .setProcedure(builder.build()).build();
     // run the procedure on the master
     ExecProcedureResponse response = executeCallable(new 
MasterCallable<ExecProcedureResponse>(
-        getConnection(), getRpcControllerFactory()) {
+        getConnection()) {
       @Override
-      protected ExecProcedureResponse call(PayloadCarryingRpcController 
rpcController)
-      throws Exception {
-        return master.execProcedure(rpcController, request);
+      public ExecProcedureResponse call(int callTimeout) throws 
ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+        return master.execProcedure(controller, request);
       }
     });
 
@@ -2655,11 +2750,12 @@ public class HBaseAdmin implements Admin {
     }
     final ProcedureDescription desc = builder.build();
     return executeCallable(
-        new MasterCallable<IsProcedureDoneResponse>(getConnection(), 
getRpcControllerFactory()) {
+        new MasterCallable<IsProcedureDoneResponse>(getConnection()) {
           @Override
-          protected IsProcedureDoneResponse call(PayloadCarryingRpcController 
rpcController)
-          throws Exception {
-            return master.isProcedureDone(rpcController, IsProcedureDoneRequest
+          public IsProcedureDoneResponse call(int callTimeout) throws 
ServiceException {
+            PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+            controller.setCallTimeout(callTimeout);
+            return master.isProcedureDone(controller, IsProcedureDoneRequest
                 .newBuilder().setProcedure(desc).build());
           }
         }).getDone();
@@ -2685,16 +2781,17 @@ public class HBaseAdmin implements Admin {
     ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
 
     RestoreSnapshotResponse response = executeCallable(
-        new MasterCallable<RestoreSnapshotResponse>(getConnection(), 
getRpcControllerFactory()) {
+        new MasterCallable<RestoreSnapshotResponse>(getConnection()) {
       @Override
-      protected RestoreSnapshotResponse call(PayloadCarryingRpcController 
rpcController)
-      throws Exception {
+      public RestoreSnapshotResponse call(int callTimeout) throws 
ServiceException {
         final RestoreSnapshotRequest request = 
RestoreSnapshotRequest.newBuilder()
             .setSnapshot(snapshot)
             .setNonceGroup(ng.getNonceGroup())
             .setNonce(ng.newNonce())
             .build();
-        return master.restoreSnapshot(rpcController, request);
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+        return master.restoreSnapshot(controller, request);
       }
     });
 
@@ -2731,13 +2828,13 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public List<SnapshotDescription> listSnapshots() throws IOException {
-    return executeCallable(new 
MasterCallable<List<SnapshotDescription>>(getConnection(),
-        getRpcControllerFactory()) {
+    return executeCallable(new 
MasterCallable<List<SnapshotDescription>>(getConnection()) {
       @Override
-      protected List<SnapshotDescription> call(PayloadCarryingRpcController 
rpcController)
-      throws Exception {
+      public List<SnapshotDescription> call(int callTimeout) throws 
ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
         List<HBaseProtos.SnapshotDescription> snapshotsList = master
-            .getCompletedSnapshots(rpcController, 
GetCompletedSnapshotsRequest.newBuilder().build())
+            .getCompletedSnapshots(controller, 
GetCompletedSnapshotsRequest.newBuilder().build())
             .getSnapshotsList();
         List<SnapshotDescription> result = new 
ArrayList<SnapshotDescription>(snapshotsList.size());
         for (HBaseProtos.SnapshotDescription snapshot : snapshotsList) {
@@ -2800,11 +2897,14 @@ public class HBaseAdmin implements Admin {
     // make sure the snapshot is possibly valid
     TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(snapshotName));
     // do the delete
-    executeCallable(new MasterCallable<Void>(getConnection(), 
getRpcControllerFactory()) {
+    executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      protected Void call(PayloadCarryingRpcController rpcController) throws 
Exception {
-        master.deleteSnapshot(rpcController,
-          DeleteSnapshotRequest.newBuilder().setSnapshot(
+      public Void call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+        master.deleteSnapshot(controller,
+          DeleteSnapshotRequest.newBuilder().
+              setSnapshot(
                 
HBaseProtos.SnapshotDescription.newBuilder().setName(snapshotName).build())
               .build()
         );
@@ -2833,10 +2933,12 @@ public class HBaseAdmin implements Admin {
   }
 
   private void internalDeleteSnapshot(final SnapshotDescription snapshot) 
throws IOException {
-    executeCallable(new MasterCallable<Void>(getConnection(), 
getRpcControllerFactory()) {
+    executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      protected Void call(PayloadCarryingRpcController rpcController) throws 
Exception {
-        this.master.deleteSnapshot(rpcController, 
DeleteSnapshotRequest.newBuilder()
+      public Void call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
+        this.master.deleteSnapshot(controller, 
DeleteSnapshotRequest.newBuilder()
           .setSnapshot(createHBaseProtosSnapshotDesc(snapshot)).build());
         return null;
       }
@@ -2865,10 +2967,11 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public void setQuota(final QuotaSettings quota) throws IOException {
-    executeCallable(new MasterCallable<Void>(getConnection(), 
getRpcControllerFactory()) {
+    executeCallable(new MasterCallable<Void>(getConnection()) {
       @Override
-      protected Void call(PayloadCarryingRpcController rpcController) throws 
Exception {
+      public Void call(int callTimeout) throws ServiceException {
         PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
         this.master.setQuota(controller, 
QuotaSettings.buildSetQuotaRequestProto(quota));
         return null;
       }
@@ -2886,8 +2989,8 @@ public class HBaseAdmin implements Admin {
   }
 
   static private <C extends RetryingCallable<V> & Closeable, V> V 
executeCallable(C callable,
-             RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, 
int rpcTimeout)
-  throws IOException {
+             RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout,
+      int rpcTimeout) throws IOException {
     RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout);
     try {
       return caller.callWithRetries(callable, operationTimeout);
@@ -2905,6 +3008,7 @@ public class HBaseAdmin implements Admin {
    * Simple {@link Abortable}, throwing RuntimeException on abort.
    */
   private static class ThrowableAbortable implements Abortable {
+
     @Override
     public void abort(String why, Throwable e) {
       throw new RuntimeException(why, e);
@@ -2922,16 +3026,13 @@ public class HBaseAdmin implements Admin {
   }
 
   @Override
-  public void updateConfiguration(final ServerName server) throws IOException {
-    final AdminService.BlockingInterface admin = 
this.connection.getAdmin(server);
-    Callable<Void> callable = new Callable<Void>() {
-      @Override
-      public Void call() throws Exception {
-        admin.updateConfiguration(null, 
UpdateConfigurationRequest.getDefaultInstance());
-        return null;
-      }
-    };
-    ProtobufUtil.call(callable);
+  public void updateConfiguration(ServerName server) throws IOException {
+    try {
+      this.connection.getAdmin(server).updateConfiguration(null,
+        UpdateConfigurationRequest.getDefaultInstance());
+    } catch (ServiceException e) {
+      throw ProtobufUtil.getRemoteException(e);
+    }
   }
 
   @Override
@@ -2944,7 +3045,8 @@ public class HBaseAdmin implements Admin {
   @Override
   public int getMasterInfoPort() throws IOException {
     // TODO: Fix!  Reaching into internal implementation!!!!
-    ConnectionImplementation connection = 
(ConnectionImplementation)this.connection;
+    ConnectionImplementation connection =
+        (ConnectionImplementation)this.connection;
     ZooKeeperKeepAliveConnection zkw = 
connection.getKeepAliveZooKeeperWatcher();
     try {
       return MasterAddressTracker.getMasterInfoPort(zkw);
@@ -2955,7 +3057,8 @@ public class HBaseAdmin implements Admin {
 
   private ServerName getMasterAddress() throws IOException {
     // TODO: Fix!  Reaching into internal implementation!!!!
-    ConnectionImplementation connection = 
(ConnectionImplementation)this.connection;
+    ConnectionImplementation connection =
+            (ConnectionImplementation)this.connection;
     ZooKeeperKeepAliveConnection zkw = 
connection.getKeepAliveZooKeeperWatcher();
     try {
       return MasterAddressTracker.getMasterAddress(zkw);
@@ -2966,26 +3069,33 @@ public class HBaseAdmin implements Admin {
 
   @Override
   public long getLastMajorCompactionTimestamp(final TableName tableName) 
throws IOException {
-    return executeCallable(new MasterCallable<Long>(getConnection(), 
getRpcControllerFactory()) {
+    return executeCallable(new MasterCallable<Long>(getConnection()) {
       @Override
-      protected Long call(PayloadCarryingRpcController rpcController) throws 
Exception {
+      public Long call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
         MajorCompactionTimestampRequest req =
             MajorCompactionTimestampRequest.newBuilder()
                 
.setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
-        return master.getLastMajorCompactionTimestamp(rpcController, 
req).getCompactionTimestamp();
+        return master.getLastMajorCompactionTimestamp(controller, 
req).getCompactionTimestamp();
       }
     });
   }
 
   @Override
   public long getLastMajorCompactionTimestampForRegion(final byte[] 
regionName) throws IOException {
-    return executeCallable(new MasterCallable<Long>(getConnection(), 
getRpcControllerFactory()) {
+    return executeCallable(new MasterCallable<Long>(getConnection()) {
       @Override
-      protected Long call(PayloadCarryingRpcController rpcController) throws 
Exception {
+      public Long call(int callTimeout) throws ServiceException {
+        PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
+        controller.setCallTimeout(callTimeout);
         MajorCompactionTimestampForRegionRequest req =
-            
MajorCompactionTimestampForRegionRequest.newBuilder().setRegion(RequestConverter
+            MajorCompactionTimestampForRegionRequest
+                .newBuilder()
+                .setRegion(
+                  RequestConverter
                       .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 
regionName)).build();
-        return master.getLastMajorCompactionTimestampForRegion(rpcController, 
req)
+        return master.getLastMajorCompactionTimestampForRegion(controller, req)
             .getCompactionTimestamp();
       }
     });
@@ -3024,35 +3134,32 @@ public class HBaseAdmin implements Admin {
   @Override
   public void majorCompact(final TableName tableName, CompactType compactType)
           throws IOException, InterruptedException {
-    compact(tableName, null, true, compactType);
+      compact(tableName, null, true, compactType);
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public CompactionState getCompactionState(final TableName tableName,
+  public CompactionState getCompactionState(TableName tableName,
     CompactType compactType) throws IOException {
     AdminProtos.GetRegionInfoResponse.CompactionState state =
         AdminProtos.GetRegionInfoResponse.CompactionState.NONE;
     checkTableExists(tableName);
-    final PayloadCarryingRpcController rpcController = 
rpcControllerFactory.newController();
+    PayloadCarryingRpcController controller = 
rpcControllerFactory.newController();
     switch (compactType) {
       case MOB:
-        final AdminProtos.AdminService.BlockingInterface masterAdmin =
-          this.connection.getAdmin(getMasterAddress());
-        Callable<AdminProtos.GetRegionInfoResponse.CompactionState> callable =
-            new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() {
-          @Override
-          public AdminProtos.GetRegionInfoResponse.CompactionState call() 
throws Exception {
-            HRegionInfo info = getMobRegionInfo(tableName);
-            GetRegionInfoRequest request = 
RequestConverter.buildGetRegionInfoRequest(
-                info.getRegionName(), true);
-            GetRegionInfoResponse response = 
masterAdmin.getRegionInfo(rpcController, request);
-            return response.getCompactionState();
-          }
-        };
-        state = ProtobufUtil.call(callable);
+        try {
+          ServerName master = getMasterAddress();
+          HRegionInfo info = getMobRegionInfo(tableName);
+          GetRegionInfoRequest request = 
RequestConverter.buildGetRegionInfoRequest(
+                  info.getRegionName(), true);
+          GetRegionInfoResponse response = this.connection.getAdmin(master)
+                  .getRegionInfo(controller, request);
+          state = response.getCompactionState();
+        } catch (ServiceException se) {
+          throw ProtobufUtil.getRemoteException(se);
+        }
         break;
       case NORMAL:
       default:
@@ -3066,23 +3173,15 @@ public class HBaseAdmin implements Admin {
           } else {
             pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, 
tableName);
           }
-          for (Pair<HRegionInfo, ServerName> pair: pairs) {
+          for (Pair<HRegionInfo, ServerName> pair : pairs) {
             if (pair.getFirst().isOffline()) continue;
             if (pair.getSecond() == null) continue;
-            final ServerName sn = pair.getSecond();
-            final byte [] regionName = pair.getFirst().getRegionName();
-            final AdminService.BlockingInterface snAdmin = 
this.connection.getAdmin(sn);
             try {
-              Callable<GetRegionInfoResponse> regionInfoCallable =
-                  new Callable<GetRegionInfoResponse>() {
-                @Override
-                public GetRegionInfoResponse call() throws Exception {
-                  GetRegionInfoRequest request = 
RequestConverter.buildGetRegionInfoRequest(
-                      regionName, true);
-                  return snAdmin.getRegionInfo(rpcController, request);
-                }
-              };
-              GetRegionInfoResponse response = 
ProtobufUtil.call(regionInfoCallable);
+              ServerName sn = pair.getSecond();
+              AdminService.BlockingInterface admin = 
this.connection.getAdmin(sn);
+              GetRegionInfoRequest request = 
RequestConverter.buildGetRegionInfoRequest(
+                      pair.getFirst().getRegionName(), true);
+              GetRegionInfoResponse response = admin.getRegionInfo(controller, 
request);
               switch (response.getCompactionState()) {
                 case MAJOR_AND_MINOR:
                   return CompactionState.MAJOR_AND_MINOR;
@@ -3118,6 +3217,8 @@ public class HBaseAdmin implements Admin {
               }
             }
           }
+        } catch (ServiceException se) {
+          throw ProtobufUtil.getRemoteException(se);
         } finally {
           if (zookeeper != null) {
             zookeeper.close();
@@ -3182,11 +3283,12 @@ public class HBaseAdmin implements Admin {
     protected AbortProcedureResponse abortProcedureResult(
         final AbortProcedureRequest request) throws IOException {
       return admin.executeCallable(new MasterCallable<AbortProcedureResponse>(
-          admin.getConnection(), admin.getRpcControllerFactory()) {
+          admin.getConnection()) {
         @Override
-        protected AbortProcedureResponse call(PayloadCarryingRpcController 
rcpController)
-        throws Exception {
-          return master.abortProcedure(rcpController, request);
+        public AbortProcedureResponse call(int callTimeout) throws 
ServiceException {
+          PayloadCarryingRpcController controller = 
admin.getRpcControllerFactory().newCon

<TRUNCATED>

Reply via email to