http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 29650ef..fa18bd8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -18,10 +18,6 @@ */ package org.apache.hadoop.hbase.client; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; -import com.google.protobuf.ServiceException; - import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; @@ -32,6 +28,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -69,7 +66,6 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.security.SecurityCapability; -import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel; @@ -183,6 +179,8 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException; +import com.google.common.annotations.VisibleForTesting; + /** * HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that * this is an HBase-internal class as defined in @@ -211,10 +209,6 @@ public class HBaseAdmin implements Admin { private volatile Configuration conf; private final long pause; private final int numRetries; - // Some operations can take a long time such as disable of big table. - // numRetries is for 'normal' stuff... Multiply by this factor when - // want to wait a long time. - private final int retryLongerMultiplier; private final int syncWaitTimeout; private boolean aborted; private int operationTimeout; @@ -239,8 +233,6 @@ public class HBaseAdmin implements Admin { HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.retryLongerMultiplier = this.conf.getInt( - "hbase.client.retries.longer.multiplier", 10); this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, @@ -262,7 +254,7 @@ public class HBaseAdmin implements Admin { } @Override - public boolean isAborted(){ + public boolean isAborted() { return this.aborted; } @@ -274,18 +266,16 @@ public class HBaseAdmin implements Admin { } @Override - public Future<Boolean> abortProcedureAsync( - final long procId, - final boolean mayInterruptIfRunning) throws IOException { + public Future<Boolean> abortProcedureAsync(final long procId, final boolean mayInterruptIfRunning) + throws IOException { Boolean abortProcResponse = executeCallable( - new MasterCallable<AbortProcedureResponse>(getConnection()) { + new MasterCallable<AbortProcedureResponse>(getConnection(), getRpcControllerFactory()) { @Override - public AbortProcedureResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); + protected AbortProcedureResponse call(PayloadCarryingRpcController rpcController) + throws Exception { AbortProcedureRequest abortProcRequest = AbortProcedureRequest.newBuilder().setProcId(procId).build(); - return master.abortProcedure(controller, abortProcRequest); + return master.abortProcedure(rpcController, abortProcRequest); } }).getIsProcedureAborted(); @@ -324,9 +314,9 @@ public class HBaseAdmin implements Admin { @Override public boolean tableExists(final TableName tableName) throws IOException { - return executeCallable(new ConnectionCallable<Boolean>(getConnection()) { + return executeCallable(new RpcRetryingCallable<Boolean>() { @Override - public Boolean call(int callTimeout) throws ServiceException, IOException { + protected Boolean rpcCall(int callTimeout) throws Exception { return MetaTableAccessor.tableExists(connection, tableName); } }); @@ -350,14 +340,15 @@ public class HBaseAdmin implements Admin { @Override public HTableDescriptor[] listTables(final Pattern pattern, final boolean includeSysTables) throws IOException { - return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) { + return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(), + getRpcControllerFactory()) { @Override - public HTableDescriptor[] call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); + protected HTableDescriptor[] call(PayloadCarryingRpcController rpcController) + throws Exception { GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables); - return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req)); + return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(rpcController, + req)); } }); } @@ -386,14 +377,13 @@ public class HBaseAdmin implements Admin { @Override public TableName[] listTableNames(final Pattern pattern, final boolean includeSysTables) throws IOException { - return executeCallable(new MasterCallable<TableName[]>(getConnection()) { + return executeCallable(new MasterCallable<TableName[]>(getConnection(), + getRpcControllerFactory()) { @Override - public TableName[] call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); + protected TableName[] call(PayloadCarryingRpcController rpcController) throws Exception { GetTableNamesRequest req = RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables); - return ProtobufUtil.getTableNameArray(master.getTableNames(controller, req) + return ProtobufUtil.getTableNameArray(master.getTableNames(rpcController, req) .getTableNamesList()); } }); @@ -414,27 +404,25 @@ public class HBaseAdmin implements Admin { static HTableDescriptor getTableDescriptor(final TableName tableName, Connection connection, RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory, int operationTimeout, int rpcTimeout) throws IOException { - if (tableName == null) return null; - HTableDescriptor htd = executeCallable(new MasterCallable<HTableDescriptor>(connection) { - @Override - public HTableDescriptor call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - GetTableDescriptorsResponse htds; - GetTableDescriptorsRequest req = - RequestConverter.buildGetTableDescriptorsRequest(tableName); - htds = master.getTableDescriptors(controller, req); - - if (!htds.getTableSchemaList().isEmpty()) { - return ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0)); - } - return null; + if (tableName == null) return null; + HTableDescriptor htd = + executeCallable(new MasterCallable<HTableDescriptor>(connection, rpcControllerFactory) { + @Override + protected HTableDescriptor call(PayloadCarryingRpcController rpcController) + throws Exception { + GetTableDescriptorsRequest req = + RequestConverter.buildGetTableDescriptorsRequest(tableName); + GetTableDescriptorsResponse htds = master.getTableDescriptors(rpcController, req); + if (!htds.getTableSchemaList().isEmpty()) { + return ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0)); } - }, rpcCallerFactory, operationTimeout, rpcTimeout); - if (htd != null) { - return htd; + return null; } - throw new TableNotFoundException(tableName.getNameAsString()); + }, rpcCallerFactory, operationTimeout, rpcTimeout); + if (htd != null) { + return htd; + } + throw new TableNotFoundException(tableName.getNameAsString()); } private long getPauseTime(int tries) { @@ -502,15 +490,14 @@ public class HBaseAdmin implements Admin { } CreateTableResponse response = executeCallable( - new MasterCallable<CreateTableResponse>(getConnection()) { + new MasterCallable<CreateTableResponse>(getConnection(), getRpcControllerFactory()) { @Override - public CreateTableResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - controller.setPriority(desc.getTableName()); + protected CreateTableResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + rpcController.setPriority(desc.getTableName()); CreateTableRequest request = RequestConverter.buildCreateTableRequest( desc, splitKeys, ng.getNonceGroup(), ng.newNonce()); - return master.createTable(controller, request); + return master.createTable(rpcController, request); } }); return new CreateTableFuture(this, desc, splitKeys, response); @@ -554,15 +541,14 @@ public class HBaseAdmin implements Admin { @Override public Future<Void> deleteTableAsync(final TableName tableName) throws IOException { DeleteTableResponse response = executeCallable( - new MasterCallable<DeleteTableResponse>(getConnection()) { + new MasterCallable<DeleteTableResponse>(getConnection(), getRpcControllerFactory()) { @Override - public DeleteTableResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - controller.setPriority(tableName); + protected DeleteTableResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + rpcController.setPriority(tableName); DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(),ng.newNonce()); - return master.deleteTable(controller,req); + return master.deleteTable(rpcController,req); } }); return new DeleteTableFuture(this, tableName, response); @@ -636,16 +622,16 @@ public class HBaseAdmin implements Admin { public Future<Void> truncateTableAsync(final TableName tableName, final boolean preserveSplits) throws IOException { TruncateTableResponse response = - executeCallable(new MasterCallable<TruncateTableResponse>(getConnection()) { + executeCallable(new MasterCallable<TruncateTableResponse>(getConnection(), + getRpcControllerFactory()) { @Override - public TruncateTableResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - controller.setPriority(tableName); + protected TruncateTableResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + rpcController.setPriority(tableName); LOG.info("Started truncating " + tableName); TruncateTableRequest req = RequestConverter.buildTruncateTableRequest( tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce()); - return master.truncateTable(controller, req); + return master.truncateTable(rpcController, req); } }); return new TruncateTableFuture(this, tableName, preserveSplits, response); @@ -701,17 +687,15 @@ public class HBaseAdmin implements Admin { public Future<Void> enableTableAsync(final TableName tableName) throws IOException { TableName.isLegalFullyQualifiedTableName(tableName.getName()); EnableTableResponse response = executeCallable( - new MasterCallable<EnableTableResponse>(getConnection()) { + new MasterCallable<EnableTableResponse>(getConnection(), getRpcControllerFactory()) { @Override - public EnableTableResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - controller.setPriority(tableName); - + protected EnableTableResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + rpcController.setPriority(tableName); LOG.info("Started enable of " + tableName); EnableTableRequest req = RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(),ng.newNonce()); - return master.enableTable(controller,req); + return master.enableTable(rpcController,req); } }); return new EnableTableFuture(this, tableName, response); @@ -767,18 +751,16 @@ public class HBaseAdmin implements Admin { public Future<Void> disableTableAsync(final TableName tableName) throws IOException { TableName.isLegalFullyQualifiedTableName(tableName.getName()); DisableTableResponse response = executeCallable( - new MasterCallable<DisableTableResponse>(getConnection()) { + new MasterCallable<DisableTableResponse>(getConnection(), getRpcControllerFactory()) { @Override - public DisableTableResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - controller.setPriority(tableName); - + protected DisableTableResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + rpcController.setPriority(tableName); LOG.info("Started disable of " + tableName); DisableTableRequest req = RequestConverter.buildDisableTableRequest( tableName, ng.getNonceGroup(), ng.newNonce()); - return master.disableTable(controller, req); + return master.disableTable(rpcController, req); } }); return new DisableTableFuture(this, tableName, response); @@ -827,9 +809,9 @@ public class HBaseAdmin implements Admin { @Override public boolean isTableEnabled(final TableName tableName) throws IOException { checkTableExists(tableName); - return executeCallable(new ConnectionCallable<Boolean>(getConnection()) { + return executeCallable(new RpcRetryingCallable<Boolean>() { @Override - public Boolean call(int callTimeout) throws ServiceException, IOException { + protected Boolean rpcCall(int callTimeout) throws Exception { TableState tableState = MetaTableAccessor.getTableState(connection, tableName); if (tableState == null) throw new TableNotFoundException(tableName); @@ -856,16 +838,15 @@ public class HBaseAdmin implements Admin { @Override public Pair<Integer, Integer> getAlterStatus(final TableName tableName) throws IOException { - return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection()) { + return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection(), + getRpcControllerFactory()) { @Override - public Pair<Integer, Integer> call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - controller.setPriority(tableName); - + protected Pair<Integer, Integer> call(PayloadCarryingRpcController rpcController) + throws Exception { + rpcController.setPriority(tableName); GetSchemaAlterStatusRequest req = RequestConverter .buildGetSchemaAlterStatusRequest(tableName); - GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(controller, req); + GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(rpcController, req); Pair<Integer, Integer> pair = new Pair<>(ret.getYetToUpdateRegions(), ret.getTotalRegions()); return pair; @@ -894,17 +875,16 @@ public class HBaseAdmin implements Admin { public Future<Void> addColumnFamily(final TableName tableName, final HColumnDescriptor columnFamily) throws IOException { AddColumnResponse response = - executeCallable(new MasterCallable<AddColumnResponse>(getConnection()) { + executeCallable(new MasterCallable<AddColumnResponse>(getConnection(), + getRpcControllerFactory()) { @Override - public AddColumnResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - controller.setPriority(tableName); - + protected AddColumnResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + rpcController.setPriority(tableName); AddColumnRequest req = RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()); - return master.addColumn(controller, req); + return master.addColumn(rpcController, req); } }); return new AddColumnFamilyFuture(this, tableName, response); @@ -939,17 +919,16 @@ public class HBaseAdmin implements Admin { public Future<Void> deleteColumnFamily(final TableName tableName, final byte[] columnFamily) throws IOException { DeleteColumnResponse response = - executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection()) { + executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection(), + getRpcControllerFactory()) { @Override - public DeleteColumnResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - controller.setPriority(tableName); - + protected DeleteColumnResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + rpcController.setPriority(tableName); DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()); - master.deleteColumn(controller, req); + master.deleteColumn(rpcController, req); return null; } }); @@ -985,17 +964,16 @@ public class HBaseAdmin implements Admin { public Future<Void> modifyColumnFamily(final TableName tableName, final HColumnDescriptor columnFamily) throws IOException { ModifyColumnResponse response = - executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection()) { + executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection(), + getRpcControllerFactory()) { @Override - public ModifyColumnResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - controller.setPriority(tableName); - + protected ModifyColumnResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + rpcController.setPriority(tableName); ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()); - master.modifyColumn(controller, req); + master.modifyColumn(rpcController, req); return null; } }); @@ -1044,28 +1022,26 @@ public class HBaseAdmin implements Admin { @Override public boolean closeRegionWithEncodedRegionName(final String encodedRegionName, final String serverName) throws IOException { - if (null == serverName || ("").equals(serverName.trim())) { - throw new IllegalArgumentException( - "The servername cannot be null or empty."); + if (null == serverName || ("").equals(serverName.trim())) { + throw new IllegalArgumentException("The servername cannot be null or empty."); } - ServerName sn = ServerName.valueOf(serverName); - AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - // Close the region without updating zk state. - CloseRegionRequest request = - RequestConverter.buildCloseRegionRequest(sn, encodedRegionName); - try { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - - // TODO: this does not do retries, it should. Set priority and timeout in controller - CloseRegionResponse response = admin.closeRegion(controller, request); - boolean isRegionClosed = response.getClosed(); - if (false == isRegionClosed) { - LOG.error("Not able to close the region " + encodedRegionName + "."); + final ServerName sn = ServerName.valueOf(serverName); + final AdminService.BlockingInterface admin = connection.getAdmin(sn); + final PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + return executeCallable(new RpcRetryingCallable<Boolean>() { + @Override + protected Boolean rpcCall(int callTimeout) throws Exception { + controller.setCallTimeout(callTimeout); + CloseRegionRequest request = + RequestConverter.buildCloseRegionRequest(sn, encodedRegionName); + CloseRegionResponse response = admin.closeRegion(controller, request); + boolean closed = response.getClosed(); + if (false == closed) { + LOG.error("Not able to close the region " + encodedRegionName + "."); + } + return closed; } - return isRegionClosed; - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + }); } @Override @@ -1104,20 +1080,20 @@ public class HBaseAdmin implements Admin { if (regionServerPair.getSecond() == null) { throw new NoServerForRegionException(Bytes.toStringBinary(regionName)); } - HRegionInfo hRegionInfo = regionServerPair.getFirst(); + final HRegionInfo hRegionInfo = regionServerPair.getFirst(); ServerName serverName = regionServerPair.getSecond(); - - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - - AdminService.BlockingInterface admin = this.connection.getAdmin(serverName); - FlushRegionRequest request = - RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName()); - try { - // TODO: this does not do retries, it should. Set priority and timeout in controller - admin.flushRegion(controller, request); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + final PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + final AdminService.BlockingInterface admin = this.connection.getAdmin(serverName); + executeCallable(new RpcRetryingCallable<Void>() { + @Override + protected Void rpcCall(int callTimeout) throws Exception { + controller.setCallTimeout(callTimeout); + FlushRegionRequest request = + RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName()); + admin.flushRegion(controller, request); + return null; + } + }); } /** @@ -1268,67 +1244,45 @@ public class HBaseAdmin implements Admin { private void compact(final ServerName sn, final HRegionInfo hri, final boolean major, final byte [] family) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - CompactRegionRequest request = - RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family); - try { - // TODO: this does not do retries, it should. Set priority and timeout in controller - admin.compactRegion(controller, request); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + final PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + final AdminService.BlockingInterface admin = this.connection.getAdmin(sn); + executeCallable(new RpcRetryingCallable<Void>() { + @Override + protected Void rpcCall(int callTimeout) throws Exception { + controller.setCallTimeout(callTimeout); + CompactRegionRequest request = + RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family); + admin.compactRegion(controller, request); + return null; + } + }); } @Override public void move(final byte [] encodedRegionName, final byte [] destServerName) - throws IOException { - - executeCallable(new MasterCallable<Void>(getConnection()) { + throws IOException { + executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { @Override - public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - // Hard to know the table name, at least check if meta - if (isMetaRegion(encodedRegionName)) { - controller.setPriority(TableName.META_TABLE_NAME); - } - - try { - MoveRegionRequest request = - RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName); - master.moveRegion(controller, request); - } catch (DeserializationException de) { - LOG.error("Could not parse destination server name: " + de); - throw new ServiceException(new DoNotRetryIOException(de)); - } + protected Void call(PayloadCarryingRpcController rpcController) throws Exception { + rpcController.setPriority(encodedRegionName); + MoveRegionRequest request = + RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName); + master.moveRegion(rpcController, request); return null; } }); } - private boolean isMetaRegion(final byte[] regionName) { - return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName()) - || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()); - } - @Override - public void assign(final byte[] regionName) throws MasterNotRunningException, + public void assign(final byte [] regionName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { - final byte[] toBeAssigned = getRegionName(regionName); - executeCallable(new MasterCallable<Void>(getConnection()) { + executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { @Override - public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - // Hard to know the table name, at least check if meta - if (isMetaRegion(regionName)) { - controller.setPriority(TableName.META_TABLE_NAME); - } - + protected Void call(PayloadCarryingRpcController rpcController) throws Exception { + rpcController.setPriority(regionName); AssignRegionRequest request = - RequestConverter.buildAssignRegionRequest(toBeAssigned); - master.assignRegion(controller,request); + RequestConverter.buildAssignRegionRequest(getRegionName(regionName)); + master.assignRegion(rpcController, request); return null; } }); @@ -1338,18 +1292,13 @@ public class HBaseAdmin implements Admin { public void unassign(final byte [] regionName, final boolean force) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { final byte[] toBeUnassigned = getRegionName(regionName); - executeCallable(new MasterCallable<Void>(getConnection()) { + executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { @Override - public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - // Hard to know the table name, at least check if meta - if (isMetaRegion(regionName)) { - controller.setPriority(TableName.META_TABLE_NAME); - } + protected Void call(PayloadCarryingRpcController rpcController) throws Exception { + rpcController.setPriority(regionName); UnassignRegionRequest request = - RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force); - master.unassignRegion(controller, request); + RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force); + master.unassignRegion(rpcController, request); return null; } }); @@ -1358,16 +1307,11 @@ public class HBaseAdmin implements Admin { @Override public void offline(final byte [] regionName) throws IOException { - executeCallable(new MasterCallable<Void>(getConnection()) { + executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { @Override - public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - // Hard to know the table name, at least check if meta - if (isMetaRegion(regionName)) { - controller.setPriority(TableName.META_TABLE_NAME); - } - master.offlineRegion(controller, RequestConverter.buildOfflineRegionRequest(regionName)); + protected Void call(PayloadCarryingRpcController rpcController) throws Exception { + rpcController.setPriority(regionName); + master.offlineRegion(rpcController, RequestConverter.buildOfflineRegionRequest(regionName)); return null; } }); @@ -1376,56 +1320,44 @@ public class HBaseAdmin implements Admin { @Override public boolean setBalancerRunning(final boolean on, final boolean synchronous) throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection()) { + return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { @Override - public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - + protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { SetBalancerRunningRequest req = RequestConverter.buildSetBalancerRunningRequest(on, synchronous); - return master.setBalancerRunning(controller, req).getPrevBalanceValue(); + return master.setBalancerRunning(rpcController, req).getPrevBalanceValue(); } }); } @Override public boolean balancer() throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection()) { + return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { @Override - public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - - return master.balance(controller, - RequestConverter.buildBalanceRequest(false)).getBalancerRan(); + protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { + return master.balance(rpcController, + RequestConverter.buildBalanceRequest(false)).getBalancerRan(); } }); } @Override public boolean balancer(final boolean force) throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection()) { + return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { @Override - public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - - return master.balance(controller, - RequestConverter.buildBalanceRequest(force)).getBalancerRan(); + protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { + return master.balance(rpcController, + RequestConverter.buildBalanceRequest(force)).getBalancerRan(); } }); } @Override public boolean isBalancerEnabled() throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection()) { + return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { @Override - public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - - return master.isBalancerEnabled(controller, + protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { + return master.isBalancerEnabled(rpcController, RequestConverter.buildIsBalancerEnabledRequest()).getEnabled(); } }); @@ -1433,13 +1365,10 @@ public class HBaseAdmin implements Admin { @Override public boolean normalize() throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection()) { + return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { @Override - public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - - return master.normalize(controller, + protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { + return master.normalize(rpcController, RequestConverter.buildNormalizeRequest()).getNormalizerRan(); } }); @@ -1447,13 +1376,10 @@ public class HBaseAdmin implements Admin { @Override public boolean isNormalizerEnabled() throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection()) { + return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { @Override - public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - - return master.isNormalizerEnabled(controller, + protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { + return master.isNormalizerEnabled(rpcController, RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled(); } }); @@ -1461,28 +1387,22 @@ public class HBaseAdmin implements Admin { @Override public boolean setNormalizerRunning(final boolean on) throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection()) { + return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { @Override - public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - + protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { SetNormalizerRunningRequest req = RequestConverter.buildSetNormalizerRunningRequest(on); - return master.setNormalizerRunning(controller, req).getPrevNormalizerValue(); + return master.setNormalizerRunning(rpcController, req).getPrevNormalizerValue(); } }); } @Override public boolean enableCatalogJanitor(final boolean enable) throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection()) { + return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { @Override - public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - - return master.enableCatalogJanitor(controller, + protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { + return master.enableCatalogJanitor(rpcController, RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue(); } }); @@ -1490,13 +1410,10 @@ public class HBaseAdmin implements Admin { @Override public int runCatalogScan() throws IOException { - return executeCallable(new MasterCallable<Integer>(getConnection()) { + return executeCallable(new MasterCallable<Integer>(getConnection(), getRpcControllerFactory()) { @Override - public Integer call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - - return master.runCatalogScan(controller, + protected Integer call(PayloadCarryingRpcController rpcController) throws Exception { + return master.runCatalogScan(rpcController, RequestConverter.buildCatalogScanRequest()).getScanResult(); } }); @@ -1504,13 +1421,10 @@ public class HBaseAdmin implements Admin { @Override public boolean isCatalogJanitorEnabled() throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection()) { + return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { @Override - public Boolean call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - - return master.isCatalogJanitorEnabled(controller, + protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { + return master.isCatalogJanitorEnabled(rpcController, RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue(); } }); @@ -1616,25 +1530,19 @@ public class HBaseAdmin implements Admin { } DispatchMergingRegionsResponse response = - executeCallable(new MasterCallable<DispatchMergingRegionsResponse>(getConnection()) { + executeCallable(new MasterCallable<DispatchMergingRegionsResponse>(getConnection(), + getRpcControllerFactory()) { @Override - public DispatchMergingRegionsResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - - try { - DispatchMergingRegionsRequest request = RequestConverter - .buildDispatchMergingRegionsRequest( + protected DispatchMergingRegionsResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + DispatchMergingRegionsRequest request = RequestConverter + .buildDispatchMergingRegionsRequest( encodedNameOfRegionA, encodedNameOfRegionB, forcible, ng.getNonceGroup(), ng.newNonce()); - return master.dispatchMergingRegions(controller, request); - } catch (DeserializationException de) { - LOG.error("Could not parse destination server name: " + de); - throw new ServiceException(new DoNotRetryIOException(de)); - } + return master.dispatchMergingRegions(rpcController, request); } }); return new DispatchMergingRegionsFuture(this, tableName, response); @@ -1746,21 +1654,17 @@ public class HBaseAdmin implements Admin { throw new IllegalArgumentException("the specified table name '" + tableName + "' doesn't match with the HTD one: " + htd.getTableName()); } - ModifyTableResponse response = executeCallable( - new MasterCallable<ModifyTableResponse>(getConnection()) { + new MasterCallable<ModifyTableResponse>(getConnection(), getRpcControllerFactory()) { @Override - public ModifyTableResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - controller.setPriority(tableName); - + protected ModifyTableResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + rpcController.setPriority(tableName); ModifyTableRequest request = RequestConverter.buildModifyTableRequest( tableName, htd, ng.getNonceGroup(), ng.newNonce()); - return master.modifyTable(controller, request); + return master.modifyTable(rpcController, request); } }); - return new ModifyTableFuture(this, tableName, response); } @@ -1875,9 +1779,9 @@ public class HBaseAdmin implements Admin { */ private TableName checkTableExists(final TableName tableName) throws IOException { - return executeCallable(new ConnectionCallable<TableName>(getConnection()) { + return executeCallable(new RpcRetryingCallable<TableName>() { @Override - public TableName call(int callTimeout) throws ServiceException, IOException { + protected TableName rpcCall(int callTimeout) throws Exception { if (!MetaTableAccessor.tableExists(connection, tableName)) { throw new TableNotFoundException(tableName); } @@ -1888,13 +1792,11 @@ public class HBaseAdmin implements Admin { @Override public synchronized void shutdown() throws IOException { - executeCallable(new MasterCallable<Void>(getConnection()) { + executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { @Override - public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - controller.setPriority(HConstants.HIGH_QOS); - master.shutdown(controller, ShutdownRequest.newBuilder().build()); + protected Void call(PayloadCarryingRpcController rpcController) throws Exception { + rpcController.setPriority(HConstants.HIGH_QOS); + master.shutdown(rpcController, ShutdownRequest.newBuilder().build()); return null; } }); @@ -1902,13 +1804,11 @@ public class HBaseAdmin implements Admin { @Override public synchronized void stopMaster() throws IOException { - executeCallable(new MasterCallable<Void>(getConnection()) { + executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { @Override - public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - controller.setPriority(HConstants.HIGH_QOS); - master.stopMaster(controller, StopMasterRequest.newBuilder().build()); + protected Void call(PayloadCarryingRpcController rpcController) throws Exception { + rpcController.setPriority(HConstants.HIGH_QOS); + master.stopMaster(rpcController, StopMasterRequest.newBuilder().build()); return null; } }); @@ -1919,43 +1819,41 @@ public class HBaseAdmin implements Admin { throws IOException { String hostname = Addressing.parseHostname(hostnamePort); int port = Addressing.parsePort(hostnamePort); - AdminService.BlockingInterface admin = + final AdminService.BlockingInterface admin = this.connection.getAdmin(ServerName.valueOf(hostname, port, 0)); - StopServerRequest request = RequestConverter.buildStopServerRequest( - "Called by admin client " + this.connection.toString()); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - - controller.setPriority(HConstants.HIGH_QOS); - try { - // TODO: this does not do retries, it should. Set priority and timeout in controller - admin.stopServer(controller, request); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { + @Override + protected Void call(PayloadCarryingRpcController rpcController) throws Exception { + rpcController.setPriority(HConstants.HIGH_QOS); + StopServerRequest request = RequestConverter.buildStopServerRequest( + "Called by admin client " + this.connection.toString()); + admin.stopServer(rpcController, request); + return null; + } + }); } @Override public boolean isMasterInMaintenanceMode() throws IOException { - return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection()) { + return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection(), + this.rpcControllerFactory) { @Override - public IsInMaintenanceModeResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - return master.isMasterInMaintenanceMode( - controller, IsInMaintenanceModeRequest.newBuilder().build()); + protected IsInMaintenanceModeResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + return master.isMasterInMaintenanceMode(rpcController, + IsInMaintenanceModeRequest.newBuilder().build()); } }).getInMaintenanceMode(); } @Override public ClusterStatus getClusterStatus() throws IOException { - return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) { + return executeCallable(new MasterCallable<ClusterStatus>(getConnection(), + this.rpcControllerFactory) { @Override - public ClusterStatus call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); + protected ClusterStatus call(PayloadCarryingRpcController rpcController) throws Exception { GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest(); - return ProtobufUtil.convert(master.getClusterStatus(controller, req).getClusterStatus()); + return ProtobufUtil.convert(master.getClusterStatus(rpcController, req).getClusterStatus()); } }); } @@ -1996,19 +1894,16 @@ public class HBaseAdmin implements Admin { public Future<Void> createNamespaceAsync(final NamespaceDescriptor descriptor) throws IOException { CreateNamespaceResponse response = - executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection()) { - @Override - public CreateNamespaceResponse call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - // TODO: set priority based on NS? - return master.createNamespace(controller, - CreateNamespaceRequest.newBuilder() - .setNamespaceDescriptor(ProtobufUtil - .toProtoNamespaceDescriptor(descriptor)).build() - ); - } - }); + executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection(), + getRpcControllerFactory()) { + @Override + protected CreateNamespaceResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + return master.createNamespace(rpcController, + CreateNamespaceRequest.newBuilder().setNamespaceDescriptor(ProtobufUtil. + toProtoNamespaceDescriptor(descriptor)).build()); + } + }); return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) { @Override public String getOperationType() { @@ -2027,16 +1922,16 @@ public class HBaseAdmin implements Admin { public Future<Void> modifyNamespaceAsync(final NamespaceDescriptor descriptor) throws IOException { ModifyNamespaceResponse response = - executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection()) { - @Override - public ModifyNamespaceResponse call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - // TODO: set priority based on NS? - return master.modifyNamespace(controller, ModifyNamespaceRequest.newBuilder(). - setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build()); - } - }); + executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection(), + getRpcControllerFactory()) { + @Override + protected ModifyNamespaceResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + // TODO: set priority based on NS? + return master.modifyNamespace(rpcController, ModifyNamespaceRequest.newBuilder(). + setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build()); + } + }); return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) { @Override public String getOperationType() { @@ -2055,16 +1950,16 @@ public class HBaseAdmin implements Admin { public Future<Void> deleteNamespaceAsync(final String name) throws IOException { DeleteNamespaceResponse response = - executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection()) { - @Override - public DeleteNamespaceResponse call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - // TODO: set priority based on NS? - return master.deleteNamespace(controller, DeleteNamespaceRequest.newBuilder(). - setNamespaceName(name).build()); - } - }); + executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection(), + getRpcControllerFactory()) { + @Override + protected DeleteNamespaceResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + // TODO: set priority based on NS? + return master.deleteNamespace(rpcController, DeleteNamespaceRequest.newBuilder(). + setNamespaceName(name).build()); + } + }); return new NamespaceFuture(this, name, response.getProcId()) { @Override public String getOperationType() { @@ -2075,100 +1970,94 @@ public class HBaseAdmin implements Admin { @Override public NamespaceDescriptor getNamespaceDescriptor(final String name) throws IOException { - return - executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection()) { - @Override - public NamespaceDescriptor call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - return ProtobufUtil.toNamespaceDescriptor( - master.getNamespaceDescriptor(controller, GetNamespaceDescriptorRequest.newBuilder(). + return executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection(), + getRpcControllerFactory()) { + @Override + protected NamespaceDescriptor call(PayloadCarryingRpcController rpcController) + throws Exception { + return ProtobufUtil.toNamespaceDescriptor( + master.getNamespaceDescriptor(rpcController, GetNamespaceDescriptorRequest.newBuilder(). setNamespaceName(name).build()).getNamespaceDescriptor()); - } - }); + } + }); } @Override public NamespaceDescriptor[] listNamespaceDescriptors() throws IOException { - return - executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection()) { - @Override - public NamespaceDescriptor[] call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - List<HBaseProtos.NamespaceDescriptor> list = - master.listNamespaceDescriptors(controller, - ListNamespaceDescriptorsRequest.newBuilder().build()) - .getNamespaceDescriptorList(); - NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()]; - for(int i = 0; i < list.size(); i++) { - res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i)); - } - return res; - } - }); + return executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection(), + getRpcControllerFactory()) { + @Override + protected NamespaceDescriptor[] call(PayloadCarryingRpcController rpcController) + throws Exception { + List<HBaseProtos.NamespaceDescriptor> list = + master.listNamespaceDescriptors(rpcController, + ListNamespaceDescriptorsRequest.newBuilder().build()).getNamespaceDescriptorList(); + NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()]; + for(int i = 0; i < list.size(); i++) { + res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i)); + } + return res; + } + }); } @Override public ProcedureInfo[] listProcedures() throws IOException { - return - executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection()) { - @Override - public ProcedureInfo[] call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - List<ProcedureProtos.Procedure> procList = master.listProcedures( - controller, ListProceduresRequest.newBuilder().build()).getProcedureList(); - ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()]; - for (int i = 0; i < procList.size(); i++) { - procInfoList[i] = ProcedureUtil.convert(procList.get(i)); - } - return procInfoList; - } - }); + return executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection(), + getRpcControllerFactory()) { + @Override + protected ProcedureInfo[] call(PayloadCarryingRpcController rpcController) + throws Exception { + List<ProcedureProtos.Procedure> procList = master.listProcedures( + rpcController, ListProceduresRequest.newBuilder().build()).getProcedureList(); + ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()]; + for (int i = 0; i < procList.size(); i++) { + procInfoList[i] = ProcedureUtil.convert(procList.get(i)); + } + return procInfoList; + } + }); } @Override public HTableDescriptor[] listTableDescriptorsByNamespace(final String name) throws IOException { - return - executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) { - @Override - public HTableDescriptor[] call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - List<TableSchema> list = - master.listTableDescriptorsByNamespace(controller, - ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name) - .build()).getTableSchemaList(); - HTableDescriptor[] res = new HTableDescriptor[list.size()]; - for(int i=0; i < list.size(); i++) { - - res[i] = ProtobufUtil.convertToHTableDesc(list.get(i)); - } - return res; - } - }); + return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(), + getRpcControllerFactory()) { + @Override + protected HTableDescriptor[] call(PayloadCarryingRpcController rpcController) + throws Exception { + List<TableSchema> list = + master.listTableDescriptorsByNamespace(rpcController, + ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name) + .build()).getTableSchemaList(); + HTableDescriptor[] res = new HTableDescriptor[list.size()]; + for(int i=0; i < list.size(); i++) { + + res[i] = ProtobufUtil.convertToHTableDesc(list.get(i)); + } + return res; + } + }); } @Override public TableName[] listTableNamesByNamespace(final String name) throws IOException { - return - executeCallable(new MasterCallable<TableName[]>(getConnection()) { - @Override - public TableName[] call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - List<HBaseProtos.TableName> tableNames = - master.listTableNamesByNamespace(controller, ListTableNamesByNamespaceRequest. + return executeCallable(new MasterCallable<TableName[]>(getConnection(), + getRpcControllerFactory()) { + @Override + protected TableName[] call(PayloadCarryingRpcController rpcController) + throws Exception { + List<HBaseProtos.TableName> tableNames = + master.listTableNamesByNamespace(rpcController, ListTableNamesByNamespaceRequest. newBuilder().setNamespaceName(name).build()) - .getTableNameList(); - TableName[] result = new TableName[tableNames.size()]; - for (int i = 0; i < tableNames.size(); i++) { - result[i] = ProtobufUtil.toTableName(tableNames.get(i)); - } - return result; - } - }); + .getTableNameList(); + TableName[] result = new TableName[tableNames.size()]; + for (int i = 0; i < tableNames.size(); i++) { + result[i] = ProtobufUtil.toTableName(tableNames.get(i)); + } + return result; + } + }); } /** @@ -2176,10 +2065,26 @@ public class HBaseAdmin implements Admin { * @param conf system configuration * @throws MasterNotRunningException if the master is not running * @throws ZooKeeperConnectionException if unable to connect to zookeeper + * @deprecated since hbase-2.0.0 because throws a ServiceException. We don't want to have + * protobuf as part of our public API. Use {@link #available(Configuration)} */ // Used by tests and by the Merge tool. Merge tool uses it to figure if HBase is up or not. + // MOB uses it too. + // NOTE: hbase-2.0.0 removes ServiceException from the throw. + @Deprecated public static void checkHBaseAvailable(Configuration conf) - throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException { + throws MasterNotRunningException, ZooKeeperConnectionException, IOException, + com.google.protobuf.ServiceException { + available(conf); + } + + /** + * Is HBase available? Throw an exception if not. + * @param conf system configuration + * @throws ZooKeeperConnectionException if unable to connect to zookeeper] + */ + public static void available(final Configuration conf) + throws ZooKeeperConnectionException, InterruptedIOException { Configuration copyOfConf = HBaseConfiguration.create(conf); // We set it to make it fail as soon as possible if HBase is not available copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); @@ -2191,7 +2096,6 @@ public class HBaseAdmin implements Admin { (ClusterConnection) ConnectionFactory.createConnection(copyOfConf); ZooKeeperKeepAliveConnection zkw = ((ConnectionImplementation) connection). getKeepAliveZooKeeperWatcher();) { - // This is NASTY. FIX!!!! Dependent on internal implementation! TODO zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false); connection.isMasterRunning(); @@ -2231,14 +2135,15 @@ public class HBaseAdmin implements Admin { @Override public HTableDescriptor[] getTableDescriptorsByTableName(final List<TableName> tableNames) throws IOException { - return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) { + return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(), + getRpcControllerFactory()) { @Override - public HTableDescriptor[] call(int callTimeout) throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); + protected HTableDescriptor[] call(PayloadCarryingRpcController rpcController) + throws Exception { GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(tableNames); - return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req)); + return ProtobufUtil. + getHTableDescriptorArray(master.getTableDescriptors(rpcController, req)); } }); } @@ -2276,16 +2181,16 @@ public class HBaseAdmin implements Admin { private RollWALWriterResponse rollWALWriterImpl(final ServerName sn) throws IOException, FailedLogCloseException { - AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest(); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - - try { - // TODO: this does not do retries, it should. Set priority and timeout in controller - return admin.rollWALWriter(controller, request); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + final AdminService.BlockingInterface admin = this.connection.getAdmin(sn); + Callable<RollWALWriterResponse> callable = new Callable<RollWALWriterResponse>() { + @Override + public RollWALWriterResponse call() throws Exception { + RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + return admin.rollWALWriter(controller, request); + } + }; + return ProtobufUtil.call(callable); } /** @@ -2321,8 +2226,7 @@ public class HBaseAdmin implements Admin { } byte[][] regionsToFlush = new byte[regionCount][]; for (int i = 0; i < regionCount; i++) { - ByteString region = response.getRegionToFlush(i); - regionsToFlush[i] = region.toByteArray(); + regionsToFlush[i] = ProtobufUtil.toBytes(response.getRegionToFlush(i)); } return regionsToFlush; } @@ -2352,28 +2256,31 @@ public class HBaseAdmin implements Admin { @Override public CompactionState getCompactionStateForRegion(final byte[] regionName) throws IOException { - try { - Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName); - if (regionServerPair == null) { - throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName)); - } - if (regionServerPair.getSecond() == null) { - throw new NoServerForRegionException(Bytes.toStringBinary(regionName)); - } - ServerName sn = regionServerPair.getSecond(); - AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( - regionServerPair.getFirst().getRegionName(), true); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - // TODO: this does not do retries, it should. Set priority and timeout in controller - GetRegionInfoResponse response = admin.getRegionInfo(controller, request); - if (response.getCompactionState() != null) { - return ProtobufUtil.createCompactionState(response.getCompactionState()); - } - return null; - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + final Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName); + if (regionServerPair == null) { + throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName)); + } + if (regionServerPair.getSecond() == null) { + throw new NoServerForRegionException(Bytes.toStringBinary(regionName)); } + ServerName sn = regionServerPair.getSecond(); + final AdminService.BlockingInterface admin = this.connection.getAdmin(sn); + Callable<CompactionState> callable = new Callable<CompactionState>() { + @Override + public CompactionState call() throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( + regionServerPair.getFirst().getRegionName(), true); + + // TODO: this does not do retries, it should. Set priority and timeout in controller + GetRegionInfoResponse response = admin.getRegionInfo(controller, request); + if (response.getCompactionState() != null) { + return ProtobufUtil.createCompactionState(response.getCompactionState()); + } + return null; + } + }; + return ProtobufUtil.call(callable); } @Override @@ -2425,12 +2332,12 @@ public class HBaseAdmin implements Admin { throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e); } LOG.debug("Getting current status of snapshot from master..."); - done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) { + done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(), + getRpcControllerFactory()) { @Override - public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - return master.isSnapshotDone(controller, request); + protected IsSnapshotDoneResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + return master.isSnapshotDone(rpcController, request); } }); } @@ -2476,12 +2383,12 @@ public class HBaseAdmin implements Admin { final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot) .build(); // run the snapshot on the master - return executeCallable(new MasterCallable<SnapshotResponse>(getConnection()) { + return executeCallable(new MasterCallable<SnapshotResponse>(getConnection(), + getRpcControllerFactory()) { @Override - public SnapshotResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - return master.snapshot(controller, request); + protected SnapshotResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + return master.snapshot(rpcController, request); } }); } @@ -2490,12 +2397,12 @@ public class HBaseAdmin implements Admin { public boolean isSnapshotFinished(final SnapshotDescription snapshotDesc) throws IOException, HBaseSnapshotException, UnknownSnapshotException { final HBaseProtos.SnapshotDescription snapshot = createHBaseProtosSnapshotDesc(snapshotDesc); - return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) { + return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(), + getRpcControllerFactory()) { @Override - public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - return master.isSnapshotDone(controller, + protected IsSnapshotDoneResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + return master.isSnapshotDone(rpcController, IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build()); } }).getDone(); @@ -2674,12 +2581,11 @@ public class HBaseAdmin implements Admin { .setProcedure(builder.build()).build(); // run the procedure on the master ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>( - getConnection()) { + getConnection(), getRpcControllerFactory()) { @Override - public ExecProcedureResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - return master.execProcedureWithRet(controller, request); + protected ExecProcedureResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + return master.execProcedureWithRet(rpcController, request); } }); @@ -2701,12 +2607,11 @@ public class HBaseAdmin implements Admin { .setProcedure(builder.build()).build(); // run the procedure on the master ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>( - getConnection()) { + getConnection(), getRpcControllerFactory()) { @Override - public ExecProcedureResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - return master.execProcedure(controller, request); + protected ExecProcedureResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + return master.execProcedure(rpcController, request); } }); @@ -2750,12 +2655,11 @@ public class HBaseAdmin implements Admin { } final ProcedureDescription desc = builder.build(); return executeCallable( - new MasterCallable<IsProcedureDoneResponse>(getConnection()) { + new MasterCallable<IsProcedureDoneResponse>(getConnection(), getRpcControllerFactory()) { @Override - public IsProcedureDoneResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - return master.isProcedureDone(controller, IsProcedureDoneRequest + protected IsProcedureDoneResponse call(PayloadCarryingRpcController rpcController) + throws Exception { + return master.isProcedureDone(rpcController, IsProcedureDoneRequest .newBuilder().setProcedure(desc).build()); } }).getDone(); @@ -2781,17 +2685,16 @@ public class HBaseAdmin implements Admin { ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); RestoreSnapshotResponse response = executeCallable( - new MasterCallable<RestoreSnapshotResponse>(getConnection()) { + new MasterCallable<RestoreSnapshotResponse>(getConnection(), getRpcControllerFactory()) { @Override - public RestoreSnapshotResponse call(int callTimeout) throws ServiceException { + protected RestoreSnapshotResponse call(PayloadCarryingRpcController rpcController) + throws Exception { final RestoreSnapshotRequest request = RestoreSnapshotRequest.newBuilder() .setSnapshot(snapshot) .setNonceGroup(ng.getNonceGroup()) .setNonce(ng.newNonce()) .build(); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - return master.restoreSnapshot(controller, request); + return master.restoreSnapshot(rpcController, request); } }); @@ -2828,13 +2731,13 @@ public class HBaseAdmin implements Admin { @Override public List<SnapshotDescription> listSnapshots() throws IOException { - return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection()) { + return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection(), + getRpcControllerFactory()) { @Override - public List<SnapshotDescription> call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); + protected List<SnapshotDescription> call(PayloadCarryingRpcController rpcController) + throws Exception { List<HBaseProtos.SnapshotDescription> snapshotsList = master - .getCompletedSnapshots(controller, GetCompletedSnapshotsRequest.newBuilder().build()) + .getCompletedSnapshots(rpcController, GetCompletedSnapshotsRequest.newBuilder().build()) .getSnapshotsList(); List<SnapshotDescription> result = new ArrayList<SnapshotDescription>(snapshotsList.size()); for (HBaseProtos.SnapshotDescription snapshot : snapshotsList) { @@ -2897,14 +2800,11 @@ public class HBaseAdmin implements Admin { // make sure the snapshot is possibly valid TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(snapshotName)); // do the delete - executeCallable(new MasterCallable<Void>(getConnection()) { + executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { @Override - public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - master.deleteSnapshot(controller, - DeleteSnapshotRequest.newBuilder(). - setSnapshot( + protected Void call(PayloadCarryingRpcController rpcController) throws Exception { + master.deleteSnapshot(rpcController, + DeleteSnapshotRequest.newBuilder().setSnapshot( HBaseProtos.SnapshotDescription.newBuilder().setName(snapshotName).build()) .build() ); @@ -2933,12 +2833,10 @@ public class HBaseAdmin implements Admin { } private void internalDeleteSnapshot(final SnapshotDescription snapshot) throws IOException { - executeCallable(new MasterCallable<Void>(getConnection()) { + executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { @Override - public Void call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); - this.master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder() + protected Void call(PayloadCarryingRpcController rpcController) throws Exception { + this.master.deleteSnapshot(rpcController, DeleteSnapshotRequest.newBuilder() .setSnapshot(createHBaseProtosSnapshotDesc(snapshot)).build()); return null; } @@ -2967,11 +2865,10 @@ public class HBaseAdmin implements Admin { @Override public void setQuota(final QuotaSettings quota) throws IOException { - executeCallable(new MasterCallable<Void>(getConnection()) { + executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { @Override - public Void call(int callTimeout) throws ServiceException { + protected Void call(PayloadCarryingRpcController rpcController) throws Exception { PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); this.master.setQuota(controller, QuotaSettings.buildSetQuotaRequestProto(quota)); return null; } @@ -2989,8 +2886,8 @@ public class HBaseAdmin implements Admin { } static private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable, - RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, - int rpcTimeout) throws IOException { + RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout) + throws IOException { RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout); try { return caller.callWithRetries(callable, operationTimeout); @@ -3008,7 +2905,6 @@ public class HBaseAdmin implements Admin { * Simple {@link Abortable}, throwing RuntimeException on abort. */ private static class ThrowableAbortable implements Abortable { - @Override public void abort(String why, Throwable e) { throw new RuntimeException(why, e); @@ -3026,13 +2922,16 @@ public class HBaseAdmin implements Admin { } @Override - public void updateConfiguration(ServerName server) throws IOException { - try { - this.connection.getAdmin(server).updateConfiguration(null, - UpdateConfigurationRequest.getDefaultInstance()); - } catch (ServiceException e) { - throw ProtobufUtil.getRemoteException(e); - } + public void updateConfiguration(final ServerName server) throws IOException { + final AdminService.BlockingInterface admin = this.connection.getAdmin(server); + Callable<Void> callable = new Callable<Void>() { + @Override + public Void call() throws Exception { + admin.updateConfiguration(null, UpdateConfigurationRequest.getDefaultInstance()); + return null; + } + }; + ProtobufUtil.call(callable); } @Override @@ -3045,8 +2944,7 @@ public class HBaseAdmin implements Admin { @Override public int getMasterInfoPort() throws IOException { // TODO: Fix! Reaching into internal implementation!!!! - ConnectionImplementation connection = - (ConnectionImplementation)this.connection; + ConnectionImplementation connection = (ConnectionImplementation)this.connection; ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher(); try { return MasterAddressTracker.getMasterInfoPort(zkw); @@ -3057,8 +2955,7 @@ public class HBaseAdmin implements Admin { private ServerName getMasterAddress() throws IOException { // TODO: Fix! Reaching into internal implementation!!!! - ConnectionImplementation connection = - (ConnectionImplementation)this.connection; + ConnectionImplementation connection = (ConnectionImplementation)this.connection; ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher(); try { return MasterAddressTracker.getMasterAddress(zkw); @@ -3069,33 +2966,26 @@ public class HBaseAdmin implements Admin { @Override public long getLastMajorCompactionTimestamp(final TableName tableName) throws IOException { - return executeCallable(new MasterCallable<Long>(getConnection()) { + return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) { @Override - public Long call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); + protected Long call(PayloadCarryingRpcController rpcController) throws Exception { MajorCompactionTimestampRequest req = MajorCompactionTimestampRequest.newBuilder() .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); - return master.getLastMajorCompactionTimestamp(controller, req).getCompactionTimestamp(); + return master.getLastMajorCompactionTimestamp(rpcController, req).getCompactionTimestamp(); } }); } @Override public long getLastMajorCompactionTimestampForRegion(final byte[] regionName) throws IOException { - return executeCallable(new MasterCallable<Long>(getConnection()) { + return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) { @Override - public Long call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setCallTimeout(callTimeout); + protected Long call(PayloadCarryingRpcController rpcController) throws Exception { MajorCompactionTimestampForRegionRequest req = - MajorCompactionTimestampForRegionRequest - .newBuilder() - .setRegion( - RequestConverter + MajorCompactionTimestampForRegionRequest.newBuilder().setRegion(RequestConverter .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)).build(); - return master.getLastMajorCompactionTimestampForRegion(controller, req) + return master.getLastMajorCompactionTimestampForRegion(rpcController, req) .getCompactionTimestamp(); } }); @@ -3134,32 +3024,35 @@ public class HBaseAdmin implements Admin { @Override public void majorCompact(final TableName tableName, CompactType compactType) throws IOException, InterruptedException { - compact(tableName, null, true, compactType); + compact(tableName, null, true, compactType); } /** * {@inheritDoc} */ @Override - public CompactionState getCompactionState(TableName tableName, + public CompactionState getCompactionState(final TableName tableName, CompactType compactType) throws IOException { AdminProtos.GetRegionInfoResponse.CompactionState state = AdminProtos.GetRegionInfoResponse.CompactionState.NONE; checkTableExists(tableName); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + final PayloadCarryingRpcController rpcController = rpcControllerFactory.newController(); switch (compactType) { case MOB: - try { - ServerName master = getMasterAddress(); - HRegionInfo info = getMobRegionInfo(tableName); - GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( - info.getRegionName(), true); - GetRegionInfoResponse response = this.connection.getAdmin(master) - .getRegionInfo(controller, request); - state = response.getCompactionState(); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + final AdminProtos.AdminService.BlockingInterface masterAdmin = + this.connection.getAdmin(getMasterAddress()); + Callable<AdminProtos.GetRegionInfoResponse.CompactionState> callable = + new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() { + @Override + public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception { + HRegionInfo info = getMobRegionInfo(tableName); + GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( + info.getRegionName(), true); + GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request); + return response.getCompactionState(); + } + }; + state = ProtobufUtil.call(callable); break; case NORMAL: default: @@ -3173,15 +3066,23 @@ public class HBaseAdmin implements Admin { } else { pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName); } - for (Pair<HRegionInfo, ServerName> pair : pairs) { + for (Pair<HRegionInfo, ServerName> pair: pairs) { if (pair.getFirst().isOffline()) continue; if (pair.getSecond() == null) continue; + final ServerName sn = pair.getSecond(); + final byte [] regionName = pair.getFirst().getRegionName(); + final AdminService.BlockingInterface snAdmin = this.connection.getAdmin(sn); try { - ServerName sn = pair.getSecond(); - AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( - pair.getFirst().getRegionName(), true); - GetRegionInfoResponse response = admin.getRegionInfo(controller, request); + Callable<GetRegionInfoResponse> regionInfoCallable = + new Callable<GetRegionInfoResponse>() { + @Override + public GetRegionInfoResponse call() throws Exception { + GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( + regionName, true); + return snAdmin.getRegionInfo(rpcController, request); + } + }; + GetRegionInfoResponse response = ProtobufUtil.call(regionInfoCallable); switch (response.getCompactionState()) { case MAJOR_AND_MINOR: return CompactionState.MAJOR_AND_MINOR; @@ -3217,8 +3118,6 @@ public class HBaseAdmin implements Admin { } } } - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); } finally { if (zookeeper != null) { zookeeper.close(); @@ -3283,12 +3182,11 @@ public class HBaseAdmin implements Admin { protected AbortProcedureResponse abortProcedureResult( final AbortProcedureRequest request) throws IOException { return admin.executeCallable(new MasterCallable<AbortProcedureResponse>( - admin.getConnection()) { + admin.getConnection(), admin.getRpcControllerFactory()) { @Override - public AbortProcedureResponse call(int callTimeout) throws ServiceException { - PayloadCarryingRpcController controller = admin.getRpcControllerFactory().newController(); - controller.setCallTimeout(callTimeout); - return master.abortProcedure(controller, request); + protected AbortProcedureResponse call(PayloadCarr
<TRUNCATED>