http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index fa18bd8..29650ef 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -18,6 +18,10 @@ */ package org.apache.hadoop.hbase.client; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; +import com.google.protobuf.ServiceException; + import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; @@ -28,7 +32,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -66,6 +69,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.security.SecurityCapability; +import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel; @@ -179,8 +183,6 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException; -import com.google.common.annotations.VisibleForTesting; - /** * HBaseAdmin is no longer a client API. It is marked InterfaceAudience.Private indicating that * this is an HBase-internal class as defined in @@ -209,6 +211,10 @@ public class HBaseAdmin implements Admin { private volatile Configuration conf; private final long pause; private final int numRetries; + // Some operations can take a long time such as disable of big table. + // numRetries is for 'normal' stuff... Multiply by this factor when + // want to wait a long time. + private final int retryLongerMultiplier; private final int syncWaitTimeout; private boolean aborted; private int operationTimeout; @@ -233,6 +239,8 @@ public class HBaseAdmin implements Admin { HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.retryLongerMultiplier = this.conf.getInt( + "hbase.client.retries.longer.multiplier", 10); this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, @@ -254,7 +262,7 @@ public class HBaseAdmin implements Admin { } @Override - public boolean isAborted() { + public boolean isAborted(){ return this.aborted; } @@ -266,16 +274,18 @@ public class HBaseAdmin implements Admin { } @Override - public Future<Boolean> abortProcedureAsync(final long procId, final boolean mayInterruptIfRunning) - throws IOException { + public Future<Boolean> abortProcedureAsync( + final long procId, + final boolean mayInterruptIfRunning) throws IOException { Boolean abortProcResponse = executeCallable( - new MasterCallable<AbortProcedureResponse>(getConnection(), getRpcControllerFactory()) { + new MasterCallable<AbortProcedureResponse>(getConnection()) { @Override - protected AbortProcedureResponse call(PayloadCarryingRpcController rpcController) - throws Exception { + public AbortProcedureResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); AbortProcedureRequest abortProcRequest = AbortProcedureRequest.newBuilder().setProcId(procId).build(); - return master.abortProcedure(rpcController, abortProcRequest); + return master.abortProcedure(controller, abortProcRequest); } }).getIsProcedureAborted(); @@ -314,9 +324,9 @@ public class HBaseAdmin implements Admin { @Override public boolean tableExists(final TableName tableName) throws IOException { - return executeCallable(new RpcRetryingCallable<Boolean>() { + return executeCallable(new ConnectionCallable<Boolean>(getConnection()) { @Override - protected Boolean rpcCall(int callTimeout) throws Exception { + public Boolean call(int callTimeout) throws ServiceException, IOException { return MetaTableAccessor.tableExists(connection, tableName); } }); @@ -340,15 +350,14 @@ public class HBaseAdmin implements Admin { @Override public HTableDescriptor[] listTables(final Pattern pattern, final boolean includeSysTables) throws IOException { - return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(), - getRpcControllerFactory()) { + return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) { @Override - protected HTableDescriptor[] call(PayloadCarryingRpcController rpcController) - throws Exception { + public HTableDescriptor[] call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables); - return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(rpcController, - req)); + return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req)); } }); } @@ -377,13 +386,14 @@ public class HBaseAdmin implements Admin { @Override public TableName[] listTableNames(final Pattern pattern, final boolean includeSysTables) throws IOException { - return executeCallable(new MasterCallable<TableName[]>(getConnection(), - getRpcControllerFactory()) { + return executeCallable(new MasterCallable<TableName[]>(getConnection()) { @Override - protected TableName[] call(PayloadCarryingRpcController rpcController) throws Exception { + public TableName[] call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetTableNamesRequest req = RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables); - return ProtobufUtil.getTableNameArray(master.getTableNames(rpcController, req) + return ProtobufUtil.getTableNameArray(master.getTableNames(controller, req) .getTableNamesList()); } }); @@ -404,25 +414,27 @@ public class HBaseAdmin implements Admin { static HTableDescriptor getTableDescriptor(final TableName tableName, Connection connection, RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory, int operationTimeout, int rpcTimeout) throws IOException { - if (tableName == null) return null; - HTableDescriptor htd = - executeCallable(new MasterCallable<HTableDescriptor>(connection, rpcControllerFactory) { - @Override - protected HTableDescriptor call(PayloadCarryingRpcController rpcController) - throws Exception { - GetTableDescriptorsRequest req = - RequestConverter.buildGetTableDescriptorsRequest(tableName); - GetTableDescriptorsResponse htds = master.getTableDescriptors(rpcController, req); - if (!htds.getTableSchemaList().isEmpty()) { - return ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0)); + if (tableName == null) return null; + HTableDescriptor htd = executeCallable(new MasterCallable<HTableDescriptor>(connection) { + @Override + public HTableDescriptor call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + GetTableDescriptorsResponse htds; + GetTableDescriptorsRequest req = + RequestConverter.buildGetTableDescriptorsRequest(tableName); + htds = master.getTableDescriptors(controller, req); + + if (!htds.getTableSchemaList().isEmpty()) { + return ProtobufUtil.convertToHTableDesc(htds.getTableSchemaList().get(0)); + } + return null; } - return null; + }, rpcCallerFactory, operationTimeout, rpcTimeout); + if (htd != null) { + return htd; } - }, rpcCallerFactory, operationTimeout, rpcTimeout); - if (htd != null) { - return htd; - } - throw new TableNotFoundException(tableName.getNameAsString()); + throw new TableNotFoundException(tableName.getNameAsString()); } private long getPauseTime(int tries) { @@ -490,14 +502,15 @@ public class HBaseAdmin implements Admin { } CreateTableResponse response = executeCallable( - new MasterCallable<CreateTableResponse>(getConnection(), getRpcControllerFactory()) { + new MasterCallable<CreateTableResponse>(getConnection()) { @Override - protected CreateTableResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - rpcController.setPriority(desc.getTableName()); + public CreateTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(desc.getTableName()); CreateTableRequest request = RequestConverter.buildCreateTableRequest( desc, splitKeys, ng.getNonceGroup(), ng.newNonce()); - return master.createTable(rpcController, request); + return master.createTable(controller, request); } }); return new CreateTableFuture(this, desc, splitKeys, response); @@ -541,14 +554,15 @@ public class HBaseAdmin implements Admin { @Override public Future<Void> deleteTableAsync(final TableName tableName) throws IOException { DeleteTableResponse response = executeCallable( - new MasterCallable<DeleteTableResponse>(getConnection(), getRpcControllerFactory()) { + new MasterCallable<DeleteTableResponse>(getConnection()) { @Override - protected DeleteTableResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - rpcController.setPriority(tableName); + public DeleteTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(),ng.newNonce()); - return master.deleteTable(rpcController,req); + return master.deleteTable(controller,req); } }); return new DeleteTableFuture(this, tableName, response); @@ -622,16 +636,16 @@ public class HBaseAdmin implements Admin { public Future<Void> truncateTableAsync(final TableName tableName, final boolean preserveSplits) throws IOException { TruncateTableResponse response = - executeCallable(new MasterCallable<TruncateTableResponse>(getConnection(), - getRpcControllerFactory()) { + executeCallable(new MasterCallable<TruncateTableResponse>(getConnection()) { @Override - protected TruncateTableResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - rpcController.setPriority(tableName); + public TruncateTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); LOG.info("Started truncating " + tableName); TruncateTableRequest req = RequestConverter.buildTruncateTableRequest( tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce()); - return master.truncateTable(rpcController, req); + return master.truncateTable(controller, req); } }); return new TruncateTableFuture(this, tableName, preserveSplits, response); @@ -687,15 +701,17 @@ public class HBaseAdmin implements Admin { public Future<Void> enableTableAsync(final TableName tableName) throws IOException { TableName.isLegalFullyQualifiedTableName(tableName.getName()); EnableTableResponse response = executeCallable( - new MasterCallable<EnableTableResponse>(getConnection(), getRpcControllerFactory()) { + new MasterCallable<EnableTableResponse>(getConnection()) { @Override - protected EnableTableResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - rpcController.setPriority(tableName); + public EnableTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + LOG.info("Started enable of " + tableName); EnableTableRequest req = RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(),ng.newNonce()); - return master.enableTable(rpcController,req); + return master.enableTable(controller,req); } }); return new EnableTableFuture(this, tableName, response); @@ -751,16 +767,18 @@ public class HBaseAdmin implements Admin { public Future<Void> disableTableAsync(final TableName tableName) throws IOException { TableName.isLegalFullyQualifiedTableName(tableName.getName()); DisableTableResponse response = executeCallable( - new MasterCallable<DisableTableResponse>(getConnection(), getRpcControllerFactory()) { + new MasterCallable<DisableTableResponse>(getConnection()) { @Override - protected DisableTableResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - rpcController.setPriority(tableName); + public DisableTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + LOG.info("Started disable of " + tableName); DisableTableRequest req = RequestConverter.buildDisableTableRequest( tableName, ng.getNonceGroup(), ng.newNonce()); - return master.disableTable(rpcController, req); + return master.disableTable(controller, req); } }); return new DisableTableFuture(this, tableName, response); @@ -809,9 +827,9 @@ public class HBaseAdmin implements Admin { @Override public boolean isTableEnabled(final TableName tableName) throws IOException { checkTableExists(tableName); - return executeCallable(new RpcRetryingCallable<Boolean>() { + return executeCallable(new ConnectionCallable<Boolean>(getConnection()) { @Override - protected Boolean rpcCall(int callTimeout) throws Exception { + public Boolean call(int callTimeout) throws ServiceException, IOException { TableState tableState = MetaTableAccessor.getTableState(connection, tableName); if (tableState == null) throw new TableNotFoundException(tableName); @@ -838,15 +856,16 @@ public class HBaseAdmin implements Admin { @Override public Pair<Integer, Integer> getAlterStatus(final TableName tableName) throws IOException { - return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection(), - getRpcControllerFactory()) { + return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection()) { @Override - protected Pair<Integer, Integer> call(PayloadCarryingRpcController rpcController) - throws Exception { - rpcController.setPriority(tableName); + public Pair<Integer, Integer> call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + GetSchemaAlterStatusRequest req = RequestConverter .buildGetSchemaAlterStatusRequest(tableName); - GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(rpcController, req); + GetSchemaAlterStatusResponse ret = master.getSchemaAlterStatus(controller, req); Pair<Integer, Integer> pair = new Pair<>(ret.getYetToUpdateRegions(), ret.getTotalRegions()); return pair; @@ -875,16 +894,17 @@ public class HBaseAdmin implements Admin { public Future<Void> addColumnFamily(final TableName tableName, final HColumnDescriptor columnFamily) throws IOException { AddColumnResponse response = - executeCallable(new MasterCallable<AddColumnResponse>(getConnection(), - getRpcControllerFactory()) { + executeCallable(new MasterCallable<AddColumnResponse>(getConnection()) { @Override - protected AddColumnResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - rpcController.setPriority(tableName); + public AddColumnResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + AddColumnRequest req = RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()); - return master.addColumn(rpcController, req); + return master.addColumn(controller, req); } }); return new AddColumnFamilyFuture(this, tableName, response); @@ -919,16 +939,17 @@ public class HBaseAdmin implements Admin { public Future<Void> deleteColumnFamily(final TableName tableName, final byte[] columnFamily) throws IOException { DeleteColumnResponse response = - executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection(), - getRpcControllerFactory()) { + executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection()) { @Override - protected DeleteColumnResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - rpcController.setPriority(tableName); + public DeleteColumnResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()); - master.deleteColumn(rpcController, req); + master.deleteColumn(controller, req); return null; } }); @@ -964,16 +985,17 @@ public class HBaseAdmin implements Admin { public Future<Void> modifyColumnFamily(final TableName tableName, final HColumnDescriptor columnFamily) throws IOException { ModifyColumnResponse response = - executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection(), - getRpcControllerFactory()) { + executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection()) { @Override - protected ModifyColumnResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - rpcController.setPriority(tableName); + public ModifyColumnResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()); - master.modifyColumn(rpcController, req); + master.modifyColumn(controller, req); return null; } }); @@ -1022,26 +1044,28 @@ public class HBaseAdmin implements Admin { @Override public boolean closeRegionWithEncodedRegionName(final String encodedRegionName, final String serverName) throws IOException { - if (null == serverName || ("").equals(serverName.trim())) { - throw new IllegalArgumentException("The servername cannot be null or empty."); + if (null == serverName || ("").equals(serverName.trim())) { + throw new IllegalArgumentException( + "The servername cannot be null or empty."); } - final ServerName sn = ServerName.valueOf(serverName); - final AdminService.BlockingInterface admin = connection.getAdmin(sn); - final PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - return executeCallable(new RpcRetryingCallable<Boolean>() { - @Override - protected Boolean rpcCall(int callTimeout) throws Exception { - controller.setCallTimeout(callTimeout); - CloseRegionRequest request = - RequestConverter.buildCloseRegionRequest(sn, encodedRegionName); - CloseRegionResponse response = admin.closeRegion(controller, request); - boolean closed = response.getClosed(); - if (false == closed) { - LOG.error("Not able to close the region " + encodedRegionName + "."); - } - return closed; + ServerName sn = ServerName.valueOf(serverName); + AdminService.BlockingInterface admin = this.connection.getAdmin(sn); + // Close the region without updating zk state. + CloseRegionRequest request = + RequestConverter.buildCloseRegionRequest(sn, encodedRegionName); + try { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + + // TODO: this does not do retries, it should. Set priority and timeout in controller + CloseRegionResponse response = admin.closeRegion(controller, request); + boolean isRegionClosed = response.getClosed(); + if (false == isRegionClosed) { + LOG.error("Not able to close the region " + encodedRegionName + "."); } - }); + return isRegionClosed; + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } @Override @@ -1080,20 +1104,20 @@ public class HBaseAdmin implements Admin { if (regionServerPair.getSecond() == null) { throw new NoServerForRegionException(Bytes.toStringBinary(regionName)); } - final HRegionInfo hRegionInfo = regionServerPair.getFirst(); + HRegionInfo hRegionInfo = regionServerPair.getFirst(); ServerName serverName = regionServerPair.getSecond(); - final PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - final AdminService.BlockingInterface admin = this.connection.getAdmin(serverName); - executeCallable(new RpcRetryingCallable<Void>() { - @Override - protected Void rpcCall(int callTimeout) throws Exception { - controller.setCallTimeout(callTimeout); - FlushRegionRequest request = - RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName()); - admin.flushRegion(controller, request); - return null; - } - }); + + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + + AdminService.BlockingInterface admin = this.connection.getAdmin(serverName); + FlushRegionRequest request = + RequestConverter.buildFlushRegionRequest(hRegionInfo.getRegionName()); + try { + // TODO: this does not do retries, it should. Set priority and timeout in controller + admin.flushRegion(controller, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } /** @@ -1244,45 +1268,67 @@ public class HBaseAdmin implements Admin { private void compact(final ServerName sn, final HRegionInfo hri, final boolean major, final byte [] family) throws IOException { - final PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - final AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - executeCallable(new RpcRetryingCallable<Void>() { - @Override - protected Void rpcCall(int callTimeout) throws Exception { - controller.setCallTimeout(callTimeout); - CompactRegionRequest request = - RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family); - admin.compactRegion(controller, request); - return null; - } - }); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + AdminService.BlockingInterface admin = this.connection.getAdmin(sn); + CompactRegionRequest request = + RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family); + try { + // TODO: this does not do retries, it should. Set priority and timeout in controller + admin.compactRegion(controller, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } @Override public void move(final byte [] encodedRegionName, final byte [] destServerName) - throws IOException { - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { + throws IOException { + + executeCallable(new MasterCallable<Void>(getConnection()) { @Override - protected Void call(PayloadCarryingRpcController rpcController) throws Exception { - rpcController.setPriority(encodedRegionName); - MoveRegionRequest request = - RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName); - master.moveRegion(rpcController, request); + public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // Hard to know the table name, at least check if meta + if (isMetaRegion(encodedRegionName)) { + controller.setPriority(TableName.META_TABLE_NAME); + } + + try { + MoveRegionRequest request = + RequestConverter.buildMoveRegionRequest(encodedRegionName, destServerName); + master.moveRegion(controller, request); + } catch (DeserializationException de) { + LOG.error("Could not parse destination server name: " + de); + throw new ServiceException(new DoNotRetryIOException(de)); + } return null; } }); } + private boolean isMetaRegion(final byte[] regionName) { + return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName()) + || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()); + } + @Override - public void assign(final byte [] regionName) throws MasterNotRunningException, + public void assign(final byte[] regionName) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { + final byte[] toBeAssigned = getRegionName(regionName); + executeCallable(new MasterCallable<Void>(getConnection()) { @Override - protected Void call(PayloadCarryingRpcController rpcController) throws Exception { - rpcController.setPriority(regionName); + public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // Hard to know the table name, at least check if meta + if (isMetaRegion(regionName)) { + controller.setPriority(TableName.META_TABLE_NAME); + } + AssignRegionRequest request = - RequestConverter.buildAssignRegionRequest(getRegionName(regionName)); - master.assignRegion(rpcController, request); + RequestConverter.buildAssignRegionRequest(toBeAssigned); + master.assignRegion(controller,request); return null; } }); @@ -1292,13 +1338,18 @@ public class HBaseAdmin implements Admin { public void unassign(final byte [] regionName, final boolean force) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { final byte[] toBeUnassigned = getRegionName(regionName); - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { + executeCallable(new MasterCallable<Void>(getConnection()) { @Override - protected Void call(PayloadCarryingRpcController rpcController) throws Exception { - rpcController.setPriority(regionName); + public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // Hard to know the table name, at least check if meta + if (isMetaRegion(regionName)) { + controller.setPriority(TableName.META_TABLE_NAME); + } UnassignRegionRequest request = - RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force); - master.unassignRegion(rpcController, request); + RequestConverter.buildUnassignRegionRequest(toBeUnassigned, force); + master.unassignRegion(controller, request); return null; } }); @@ -1307,11 +1358,16 @@ public class HBaseAdmin implements Admin { @Override public void offline(final byte [] regionName) throws IOException { - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { + executeCallable(new MasterCallable<Void>(getConnection()) { @Override - protected Void call(PayloadCarryingRpcController rpcController) throws Exception { - rpcController.setPriority(regionName); - master.offlineRegion(rpcController, RequestConverter.buildOfflineRegionRequest(regionName)); + public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // Hard to know the table name, at least check if meta + if (isMetaRegion(regionName)) { + controller.setPriority(TableName.META_TABLE_NAME); + } + master.offlineRegion(controller, RequestConverter.buildOfflineRegionRequest(regionName)); return null; } }); @@ -1320,44 +1376,56 @@ public class HBaseAdmin implements Admin { @Override public boolean setBalancerRunning(final boolean on, final boolean synchronous) throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { + return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override - protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { + public Boolean call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + SetBalancerRunningRequest req = RequestConverter.buildSetBalancerRunningRequest(on, synchronous); - return master.setBalancerRunning(rpcController, req).getPrevBalanceValue(); + return master.setBalancerRunning(controller, req).getPrevBalanceValue(); } }); } @Override public boolean balancer() throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { + return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override - protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { - return master.balance(rpcController, - RequestConverter.buildBalanceRequest(false)).getBalancerRan(); + public Boolean call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.balance(controller, + RequestConverter.buildBalanceRequest(false)).getBalancerRan(); } }); } @Override public boolean balancer(final boolean force) throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { + return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override - protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { - return master.balance(rpcController, - RequestConverter.buildBalanceRequest(force)).getBalancerRan(); + public Boolean call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.balance(controller, + RequestConverter.buildBalanceRequest(force)).getBalancerRan(); } }); } @Override public boolean isBalancerEnabled() throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { + return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override - protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { - return master.isBalancerEnabled(rpcController, + public Boolean call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.isBalancerEnabled(controller, RequestConverter.buildIsBalancerEnabledRequest()).getEnabled(); } }); @@ -1365,10 +1433,13 @@ public class HBaseAdmin implements Admin { @Override public boolean normalize() throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { + return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override - protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { - return master.normalize(rpcController, + public Boolean call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.normalize(controller, RequestConverter.buildNormalizeRequest()).getNormalizerRan(); } }); @@ -1376,10 +1447,13 @@ public class HBaseAdmin implements Admin { @Override public boolean isNormalizerEnabled() throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { + return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override - protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { - return master.isNormalizerEnabled(rpcController, + public Boolean call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.isNormalizerEnabled(controller, RequestConverter.buildIsNormalizerEnabledRequest()).getEnabled(); } }); @@ -1387,22 +1461,28 @@ public class HBaseAdmin implements Admin { @Override public boolean setNormalizerRunning(final boolean on) throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { + return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override - protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { + public Boolean call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + SetNormalizerRunningRequest req = RequestConverter.buildSetNormalizerRunningRequest(on); - return master.setNormalizerRunning(rpcController, req).getPrevNormalizerValue(); + return master.setNormalizerRunning(controller, req).getPrevNormalizerValue(); } }); } @Override public boolean enableCatalogJanitor(final boolean enable) throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { + return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override - protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { - return master.enableCatalogJanitor(rpcController, + public Boolean call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.enableCatalogJanitor(controller, RequestConverter.buildEnableCatalogJanitorRequest(enable)).getPrevValue(); } }); @@ -1410,10 +1490,13 @@ public class HBaseAdmin implements Admin { @Override public int runCatalogScan() throws IOException { - return executeCallable(new MasterCallable<Integer>(getConnection(), getRpcControllerFactory()) { + return executeCallable(new MasterCallable<Integer>(getConnection()) { @Override - protected Integer call(PayloadCarryingRpcController rpcController) throws Exception { - return master.runCatalogScan(rpcController, + public Integer call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.runCatalogScan(controller, RequestConverter.buildCatalogScanRequest()).getScanResult(); } }); @@ -1421,10 +1504,13 @@ public class HBaseAdmin implements Admin { @Override public boolean isCatalogJanitorEnabled() throws IOException { - return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) { + return executeCallable(new MasterCallable<Boolean>(getConnection()) { @Override - protected Boolean call(PayloadCarryingRpcController rpcController) throws Exception { - return master.isCatalogJanitorEnabled(rpcController, + public Boolean call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + return master.isCatalogJanitorEnabled(controller, RequestConverter.buildIsCatalogJanitorEnabledRequest()).getValue(); } }); @@ -1530,19 +1616,25 @@ public class HBaseAdmin implements Admin { } DispatchMergingRegionsResponse response = - executeCallable(new MasterCallable<DispatchMergingRegionsResponse>(getConnection(), - getRpcControllerFactory()) { + executeCallable(new MasterCallable<DispatchMergingRegionsResponse>(getConnection()) { @Override - protected DispatchMergingRegionsResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - DispatchMergingRegionsRequest request = RequestConverter - .buildDispatchMergingRegionsRequest( + public DispatchMergingRegionsResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + + try { + DispatchMergingRegionsRequest request = RequestConverter + .buildDispatchMergingRegionsRequest( encodedNameOfRegionA, encodedNameOfRegionB, forcible, ng.getNonceGroup(), ng.newNonce()); - return master.dispatchMergingRegions(rpcController, request); + return master.dispatchMergingRegions(controller, request); + } catch (DeserializationException de) { + LOG.error("Could not parse destination server name: " + de); + throw new ServiceException(new DoNotRetryIOException(de)); + } } }); return new DispatchMergingRegionsFuture(this, tableName, response); @@ -1654,17 +1746,21 @@ public class HBaseAdmin implements Admin { throw new IllegalArgumentException("the specified table name '" + tableName + "' doesn't match with the HTD one: " + htd.getTableName()); } + ModifyTableResponse response = executeCallable( - new MasterCallable<ModifyTableResponse>(getConnection(), getRpcControllerFactory()) { + new MasterCallable<ModifyTableResponse>(getConnection()) { @Override - protected ModifyTableResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - rpcController.setPriority(tableName); + public ModifyTableResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(tableName); + ModifyTableRequest request = RequestConverter.buildModifyTableRequest( tableName, htd, ng.getNonceGroup(), ng.newNonce()); - return master.modifyTable(rpcController, request); + return master.modifyTable(controller, request); } }); + return new ModifyTableFuture(this, tableName, response); } @@ -1779,9 +1875,9 @@ public class HBaseAdmin implements Admin { */ private TableName checkTableExists(final TableName tableName) throws IOException { - return executeCallable(new RpcRetryingCallable<TableName>() { + return executeCallable(new ConnectionCallable<TableName>(getConnection()) { @Override - protected TableName rpcCall(int callTimeout) throws Exception { + public TableName call(int callTimeout) throws ServiceException, IOException { if (!MetaTableAccessor.tableExists(connection, tableName)) { throw new TableNotFoundException(tableName); } @@ -1792,11 +1888,13 @@ public class HBaseAdmin implements Admin { @Override public synchronized void shutdown() throws IOException { - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { + executeCallable(new MasterCallable<Void>(getConnection()) { @Override - protected Void call(PayloadCarryingRpcController rpcController) throws Exception { - rpcController.setPriority(HConstants.HIGH_QOS); - master.shutdown(rpcController, ShutdownRequest.newBuilder().build()); + public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(HConstants.HIGH_QOS); + master.shutdown(controller, ShutdownRequest.newBuilder().build()); return null; } }); @@ -1804,11 +1902,13 @@ public class HBaseAdmin implements Admin { @Override public synchronized void stopMaster() throws IOException { - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { + executeCallable(new MasterCallable<Void>(getConnection()) { @Override - protected Void call(PayloadCarryingRpcController rpcController) throws Exception { - rpcController.setPriority(HConstants.HIGH_QOS); - master.stopMaster(rpcController, StopMasterRequest.newBuilder().build()); + public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + controller.setPriority(HConstants.HIGH_QOS); + master.stopMaster(controller, StopMasterRequest.newBuilder().build()); return null; } }); @@ -1819,41 +1919,43 @@ public class HBaseAdmin implements Admin { throws IOException { String hostname = Addressing.parseHostname(hostnamePort); int port = Addressing.parsePort(hostnamePort); - final AdminService.BlockingInterface admin = + AdminService.BlockingInterface admin = this.connection.getAdmin(ServerName.valueOf(hostname, port, 0)); - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { - @Override - protected Void call(PayloadCarryingRpcController rpcController) throws Exception { - rpcController.setPriority(HConstants.HIGH_QOS); - StopServerRequest request = RequestConverter.buildStopServerRequest( - "Called by admin client " + this.connection.toString()); - admin.stopServer(rpcController, request); - return null; - } - }); + StopServerRequest request = RequestConverter.buildStopServerRequest( + "Called by admin client " + this.connection.toString()); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + + controller.setPriority(HConstants.HIGH_QOS); + try { + // TODO: this does not do retries, it should. Set priority and timeout in controller + admin.stopServer(controller, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } @Override public boolean isMasterInMaintenanceMode() throws IOException { - return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection(), - this.rpcControllerFactory) { + return executeCallable(new MasterCallable<IsInMaintenanceModeResponse>(getConnection()) { @Override - protected IsInMaintenanceModeResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - return master.isMasterInMaintenanceMode(rpcController, - IsInMaintenanceModeRequest.newBuilder().build()); + public IsInMaintenanceModeResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.isMasterInMaintenanceMode( + controller, IsInMaintenanceModeRequest.newBuilder().build()); } }).getInMaintenanceMode(); } @Override public ClusterStatus getClusterStatus() throws IOException { - return executeCallable(new MasterCallable<ClusterStatus>(getConnection(), - this.rpcControllerFactory) { + return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) { @Override - protected ClusterStatus call(PayloadCarryingRpcController rpcController) throws Exception { + public ClusterStatus call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetClusterStatusRequest req = RequestConverter.buildGetClusterStatusRequest(); - return ProtobufUtil.convert(master.getClusterStatus(rpcController, req).getClusterStatus()); + return ProtobufUtil.convert(master.getClusterStatus(controller, req).getClusterStatus()); } }); } @@ -1894,16 +1996,19 @@ public class HBaseAdmin implements Admin { public Future<Void> createNamespaceAsync(final NamespaceDescriptor descriptor) throws IOException { CreateNamespaceResponse response = - executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection(), - getRpcControllerFactory()) { - @Override - protected CreateNamespaceResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - return master.createNamespace(rpcController, - CreateNamespaceRequest.newBuilder().setNamespaceDescriptor(ProtobufUtil. - toProtoNamespaceDescriptor(descriptor)).build()); - } - }); + executeCallable(new MasterCallable<CreateNamespaceResponse>(getConnection()) { + @Override + public CreateNamespaceResponse call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // TODO: set priority based on NS? + return master.createNamespace(controller, + CreateNamespaceRequest.newBuilder() + .setNamespaceDescriptor(ProtobufUtil + .toProtoNamespaceDescriptor(descriptor)).build() + ); + } + }); return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) { @Override public String getOperationType() { @@ -1922,16 +2027,16 @@ public class HBaseAdmin implements Admin { public Future<Void> modifyNamespaceAsync(final NamespaceDescriptor descriptor) throws IOException { ModifyNamespaceResponse response = - executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection(), - getRpcControllerFactory()) { - @Override - protected ModifyNamespaceResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - // TODO: set priority based on NS? - return master.modifyNamespace(rpcController, ModifyNamespaceRequest.newBuilder(). - setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build()); - } - }); + executeCallable(new MasterCallable<ModifyNamespaceResponse>(getConnection()) { + @Override + public ModifyNamespaceResponse call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // TODO: set priority based on NS? + return master.modifyNamespace(controller, ModifyNamespaceRequest.newBuilder(). + setNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(descriptor)).build()); + } + }); return new NamespaceFuture(this, descriptor.getName(), response.getProcId()) { @Override public String getOperationType() { @@ -1950,16 +2055,16 @@ public class HBaseAdmin implements Admin { public Future<Void> deleteNamespaceAsync(final String name) throws IOException { DeleteNamespaceResponse response = - executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection(), - getRpcControllerFactory()) { - @Override - protected DeleteNamespaceResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - // TODO: set priority based on NS? - return master.deleteNamespace(rpcController, DeleteNamespaceRequest.newBuilder(). - setNamespaceName(name).build()); - } - }); + executeCallable(new MasterCallable<DeleteNamespaceResponse>(getConnection()) { + @Override + public DeleteNamespaceResponse call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + // TODO: set priority based on NS? + return master.deleteNamespace(controller, DeleteNamespaceRequest.newBuilder(). + setNamespaceName(name).build()); + } + }); return new NamespaceFuture(this, name, response.getProcId()) { @Override public String getOperationType() { @@ -1970,94 +2075,100 @@ public class HBaseAdmin implements Admin { @Override public NamespaceDescriptor getNamespaceDescriptor(final String name) throws IOException { - return executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection(), - getRpcControllerFactory()) { - @Override - protected NamespaceDescriptor call(PayloadCarryingRpcController rpcController) - throws Exception { - return ProtobufUtil.toNamespaceDescriptor( - master.getNamespaceDescriptor(rpcController, GetNamespaceDescriptorRequest.newBuilder(). + return + executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection()) { + @Override + public NamespaceDescriptor call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return ProtobufUtil.toNamespaceDescriptor( + master.getNamespaceDescriptor(controller, GetNamespaceDescriptorRequest.newBuilder(). setNamespaceName(name).build()).getNamespaceDescriptor()); - } - }); + } + }); } @Override public NamespaceDescriptor[] listNamespaceDescriptors() throws IOException { - return executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection(), - getRpcControllerFactory()) { - @Override - protected NamespaceDescriptor[] call(PayloadCarryingRpcController rpcController) - throws Exception { - List<HBaseProtos.NamespaceDescriptor> list = - master.listNamespaceDescriptors(rpcController, - ListNamespaceDescriptorsRequest.newBuilder().build()).getNamespaceDescriptorList(); - NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()]; - for(int i = 0; i < list.size(); i++) { - res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i)); - } - return res; - } - }); + return + executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection()) { + @Override + public NamespaceDescriptor[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + List<HBaseProtos.NamespaceDescriptor> list = + master.listNamespaceDescriptors(controller, + ListNamespaceDescriptorsRequest.newBuilder().build()) + .getNamespaceDescriptorList(); + NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()]; + for(int i = 0; i < list.size(); i++) { + res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i)); + } + return res; + } + }); } @Override public ProcedureInfo[] listProcedures() throws IOException { - return executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection(), - getRpcControllerFactory()) { - @Override - protected ProcedureInfo[] call(PayloadCarryingRpcController rpcController) - throws Exception { - List<ProcedureProtos.Procedure> procList = master.listProcedures( - rpcController, ListProceduresRequest.newBuilder().build()).getProcedureList(); - ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()]; - for (int i = 0; i < procList.size(); i++) { - procInfoList[i] = ProcedureUtil.convert(procList.get(i)); - } - return procInfoList; - } - }); + return + executeCallable(new MasterCallable<ProcedureInfo[]>(getConnection()) { + @Override + public ProcedureInfo[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + List<ProcedureProtos.Procedure> procList = master.listProcedures( + controller, ListProceduresRequest.newBuilder().build()).getProcedureList(); + ProcedureInfo[] procInfoList = new ProcedureInfo[procList.size()]; + for (int i = 0; i < procList.size(); i++) { + procInfoList[i] = ProcedureUtil.convert(procList.get(i)); + } + return procInfoList; + } + }); } @Override public HTableDescriptor[] listTableDescriptorsByNamespace(final String name) throws IOException { - return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(), - getRpcControllerFactory()) { - @Override - protected HTableDescriptor[] call(PayloadCarryingRpcController rpcController) - throws Exception { - List<TableSchema> list = - master.listTableDescriptorsByNamespace(rpcController, - ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name) - .build()).getTableSchemaList(); - HTableDescriptor[] res = new HTableDescriptor[list.size()]; - for(int i=0; i < list.size(); i++) { - - res[i] = ProtobufUtil.convertToHTableDesc(list.get(i)); - } - return res; - } - }); + return + executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) { + @Override + public HTableDescriptor[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + List<TableSchema> list = + master.listTableDescriptorsByNamespace(controller, + ListTableDescriptorsByNamespaceRequest.newBuilder().setNamespaceName(name) + .build()).getTableSchemaList(); + HTableDescriptor[] res = new HTableDescriptor[list.size()]; + for(int i=0; i < list.size(); i++) { + + res[i] = ProtobufUtil.convertToHTableDesc(list.get(i)); + } + return res; + } + }); } @Override public TableName[] listTableNamesByNamespace(final String name) throws IOException { - return executeCallable(new MasterCallable<TableName[]>(getConnection(), - getRpcControllerFactory()) { - @Override - protected TableName[] call(PayloadCarryingRpcController rpcController) - throws Exception { - List<HBaseProtos.TableName> tableNames = - master.listTableNamesByNamespace(rpcController, ListTableNamesByNamespaceRequest. + return + executeCallable(new MasterCallable<TableName[]>(getConnection()) { + @Override + public TableName[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + List<HBaseProtos.TableName> tableNames = + master.listTableNamesByNamespace(controller, ListTableNamesByNamespaceRequest. newBuilder().setNamespaceName(name).build()) - .getTableNameList(); - TableName[] result = new TableName[tableNames.size()]; - for (int i = 0; i < tableNames.size(); i++) { - result[i] = ProtobufUtil.toTableName(tableNames.get(i)); - } - return result; - } - }); + .getTableNameList(); + TableName[] result = new TableName[tableNames.size()]; + for (int i = 0; i < tableNames.size(); i++) { + result[i] = ProtobufUtil.toTableName(tableNames.get(i)); + } + return result; + } + }); } /** @@ -2065,26 +2176,10 @@ public class HBaseAdmin implements Admin { * @param conf system configuration * @throws MasterNotRunningException if the master is not running * @throws ZooKeeperConnectionException if unable to connect to zookeeper - * @deprecated since hbase-2.0.0 because throws a ServiceException. We don't want to have - * protobuf as part of our public API. Use {@link #available(Configuration)} */ // Used by tests and by the Merge tool. Merge tool uses it to figure if HBase is up or not. - // MOB uses it too. - // NOTE: hbase-2.0.0 removes ServiceException from the throw. - @Deprecated public static void checkHBaseAvailable(Configuration conf) - throws MasterNotRunningException, ZooKeeperConnectionException, IOException, - com.google.protobuf.ServiceException { - available(conf); - } - - /** - * Is HBase available? Throw an exception if not. - * @param conf system configuration - * @throws ZooKeeperConnectionException if unable to connect to zookeeper] - */ - public static void available(final Configuration conf) - throws ZooKeeperConnectionException, InterruptedIOException { + throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException { Configuration copyOfConf = HBaseConfiguration.create(conf); // We set it to make it fail as soon as possible if HBase is not available copyOfConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); @@ -2096,6 +2191,7 @@ public class HBaseAdmin implements Admin { (ClusterConnection) ConnectionFactory.createConnection(copyOfConf); ZooKeeperKeepAliveConnection zkw = ((ConnectionImplementation) connection). getKeepAliveZooKeeperWatcher();) { + // This is NASTY. FIX!!!! Dependent on internal implementation! TODO zkw.getRecoverableZooKeeper().getZooKeeper().exists(zkw.baseZNode, false); connection.isMasterRunning(); @@ -2135,15 +2231,14 @@ public class HBaseAdmin implements Admin { @Override public HTableDescriptor[] getTableDescriptorsByTableName(final List<TableName> tableNames) throws IOException { - return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection(), - getRpcControllerFactory()) { + return executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) { @Override - protected HTableDescriptor[] call(PayloadCarryingRpcController rpcController) - throws Exception { + public HTableDescriptor[] call(int callTimeout) throws Exception { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); GetTableDescriptorsRequest req = RequestConverter.buildGetTableDescriptorsRequest(tableNames); - return ProtobufUtil. - getHTableDescriptorArray(master.getTableDescriptors(rpcController, req)); + return ProtobufUtil.getHTableDescriptorArray(master.getTableDescriptors(controller, req)); } }); } @@ -2181,16 +2276,16 @@ public class HBaseAdmin implements Admin { private RollWALWriterResponse rollWALWriterImpl(final ServerName sn) throws IOException, FailedLogCloseException { - final AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - Callable<RollWALWriterResponse> callable = new Callable<RollWALWriterResponse>() { - @Override - public RollWALWriterResponse call() throws Exception { - RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest(); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - return admin.rollWALWriter(controller, request); - } - }; - return ProtobufUtil.call(callable); + AdminService.BlockingInterface admin = this.connection.getAdmin(sn); + RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + + try { + // TODO: this does not do retries, it should. Set priority and timeout in controller + return admin.rollWALWriter(controller, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } /** @@ -2226,7 +2321,8 @@ public class HBaseAdmin implements Admin { } byte[][] regionsToFlush = new byte[regionCount][]; for (int i = 0; i < regionCount; i++) { - regionsToFlush[i] = ProtobufUtil.toBytes(response.getRegionToFlush(i)); + ByteString region = response.getRegionToFlush(i); + regionsToFlush[i] = region.toByteArray(); } return regionsToFlush; } @@ -2256,31 +2352,28 @@ public class HBaseAdmin implements Admin { @Override public CompactionState getCompactionStateForRegion(final byte[] regionName) throws IOException { - final Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName); - if (regionServerPair == null) { - throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName)); - } - if (regionServerPair.getSecond() == null) { - throw new NoServerForRegionException(Bytes.toStringBinary(regionName)); - } - ServerName sn = regionServerPair.getSecond(); - final AdminService.BlockingInterface admin = this.connection.getAdmin(sn); - Callable<CompactionState> callable = new Callable<CompactionState>() { - @Override - public CompactionState call() throws Exception { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( - regionServerPair.getFirst().getRegionName(), true); - - // TODO: this does not do retries, it should. Set priority and timeout in controller - GetRegionInfoResponse response = admin.getRegionInfo(controller, request); - if (response.getCompactionState() != null) { - return ProtobufUtil.createCompactionState(response.getCompactionState()); - } - return null; + try { + Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName); + if (regionServerPair == null) { + throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName)); + } + if (regionServerPair.getSecond() == null) { + throw new NoServerForRegionException(Bytes.toStringBinary(regionName)); + } + ServerName sn = regionServerPair.getSecond(); + AdminService.BlockingInterface admin = this.connection.getAdmin(sn); + GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( + regionServerPair.getFirst().getRegionName(), true); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + // TODO: this does not do retries, it should. Set priority and timeout in controller + GetRegionInfoResponse response = admin.getRegionInfo(controller, request); + if (response.getCompactionState() != null) { + return ProtobufUtil.createCompactionState(response.getCompactionState()); } - }; - return ProtobufUtil.call(callable); + return null; + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } @Override @@ -2332,12 +2425,12 @@ public class HBaseAdmin implements Admin { throw (InterruptedIOException)new InterruptedIOException("Interrupted").initCause(e); } LOG.debug("Getting current status of snapshot from master..."); - done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(), - getRpcControllerFactory()) { + done = executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) { @Override - protected IsSnapshotDoneResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - return master.isSnapshotDone(rpcController, request); + public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.isSnapshotDone(controller, request); } }); } @@ -2383,12 +2476,12 @@ public class HBaseAdmin implements Admin { final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot) .build(); // run the snapshot on the master - return executeCallable(new MasterCallable<SnapshotResponse>(getConnection(), - getRpcControllerFactory()) { + return executeCallable(new MasterCallable<SnapshotResponse>(getConnection()) { @Override - protected SnapshotResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - return master.snapshot(rpcController, request); + public SnapshotResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.snapshot(controller, request); } }); } @@ -2397,12 +2490,12 @@ public class HBaseAdmin implements Admin { public boolean isSnapshotFinished(final SnapshotDescription snapshotDesc) throws IOException, HBaseSnapshotException, UnknownSnapshotException { final HBaseProtos.SnapshotDescription snapshot = createHBaseProtosSnapshotDesc(snapshotDesc); - return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection(), - getRpcControllerFactory()) { + return executeCallable(new MasterCallable<IsSnapshotDoneResponse>(getConnection()) { @Override - protected IsSnapshotDoneResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - return master.isSnapshotDone(rpcController, + public IsSnapshotDoneResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.isSnapshotDone(controller, IsSnapshotDoneRequest.newBuilder().setSnapshot(snapshot).build()); } }).getDone(); @@ -2581,11 +2674,12 @@ public class HBaseAdmin implements Admin { .setProcedure(builder.build()).build(); // run the procedure on the master ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>( - getConnection(), getRpcControllerFactory()) { + getConnection()) { @Override - protected ExecProcedureResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - return master.execProcedureWithRet(rpcController, request); + public ExecProcedureResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.execProcedureWithRet(controller, request); } }); @@ -2607,11 +2701,12 @@ public class HBaseAdmin implements Admin { .setProcedure(builder.build()).build(); // run the procedure on the master ExecProcedureResponse response = executeCallable(new MasterCallable<ExecProcedureResponse>( - getConnection(), getRpcControllerFactory()) { + getConnection()) { @Override - protected ExecProcedureResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - return master.execProcedure(rpcController, request); + public ExecProcedureResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.execProcedure(controller, request); } }); @@ -2655,11 +2750,12 @@ public class HBaseAdmin implements Admin { } final ProcedureDescription desc = builder.build(); return executeCallable( - new MasterCallable<IsProcedureDoneResponse>(getConnection(), getRpcControllerFactory()) { + new MasterCallable<IsProcedureDoneResponse>(getConnection()) { @Override - protected IsProcedureDoneResponse call(PayloadCarryingRpcController rpcController) - throws Exception { - return master.isProcedureDone(rpcController, IsProcedureDoneRequest + public IsProcedureDoneResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.isProcedureDone(controller, IsProcedureDoneRequest .newBuilder().setProcedure(desc).build()); } }).getDone(); @@ -2685,16 +2781,17 @@ public class HBaseAdmin implements Admin { ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); RestoreSnapshotResponse response = executeCallable( - new MasterCallable<RestoreSnapshotResponse>(getConnection(), getRpcControllerFactory()) { + new MasterCallable<RestoreSnapshotResponse>(getConnection()) { @Override - protected RestoreSnapshotResponse call(PayloadCarryingRpcController rpcController) - throws Exception { + public RestoreSnapshotResponse call(int callTimeout) throws ServiceException { final RestoreSnapshotRequest request = RestoreSnapshotRequest.newBuilder() .setSnapshot(snapshot) .setNonceGroup(ng.getNonceGroup()) .setNonce(ng.newNonce()) .build(); - return master.restoreSnapshot(rpcController, request); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + return master.restoreSnapshot(controller, request); } }); @@ -2731,13 +2828,13 @@ public class HBaseAdmin implements Admin { @Override public List<SnapshotDescription> listSnapshots() throws IOException { - return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection(), - getRpcControllerFactory()) { + return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection()) { @Override - protected List<SnapshotDescription> call(PayloadCarryingRpcController rpcController) - throws Exception { + public List<SnapshotDescription> call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); List<HBaseProtos.SnapshotDescription> snapshotsList = master - .getCompletedSnapshots(rpcController, GetCompletedSnapshotsRequest.newBuilder().build()) + .getCompletedSnapshots(controller, GetCompletedSnapshotsRequest.newBuilder().build()) .getSnapshotsList(); List<SnapshotDescription> result = new ArrayList<SnapshotDescription>(snapshotsList.size()); for (HBaseProtos.SnapshotDescription snapshot : snapshotsList) { @@ -2800,11 +2897,14 @@ public class HBaseAdmin implements Admin { // make sure the snapshot is possibly valid TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(snapshotName)); // do the delete - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { + executeCallable(new MasterCallable<Void>(getConnection()) { @Override - protected Void call(PayloadCarryingRpcController rpcController) throws Exception { - master.deleteSnapshot(rpcController, - DeleteSnapshotRequest.newBuilder().setSnapshot( + public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + master.deleteSnapshot(controller, + DeleteSnapshotRequest.newBuilder(). + setSnapshot( HBaseProtos.SnapshotDescription.newBuilder().setName(snapshotName).build()) .build() ); @@ -2833,10 +2933,12 @@ public class HBaseAdmin implements Admin { } private void internalDeleteSnapshot(final SnapshotDescription snapshot) throws IOException { - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { + executeCallable(new MasterCallable<Void>(getConnection()) { @Override - protected Void call(PayloadCarryingRpcController rpcController) throws Exception { - this.master.deleteSnapshot(rpcController, DeleteSnapshotRequest.newBuilder() + public Void call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); + this.master.deleteSnapshot(controller, DeleteSnapshotRequest.newBuilder() .setSnapshot(createHBaseProtosSnapshotDesc(snapshot)).build()); return null; } @@ -2865,10 +2967,11 @@ public class HBaseAdmin implements Admin { @Override public void setQuota(final QuotaSettings quota) throws IOException { - executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) { + executeCallable(new MasterCallable<Void>(getConnection()) { @Override - protected Void call(PayloadCarryingRpcController rpcController) throws Exception { + public Void call(int callTimeout) throws ServiceException { PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); this.master.setQuota(controller, QuotaSettings.buildSetQuotaRequestProto(quota)); return null; } @@ -2886,8 +2989,8 @@ public class HBaseAdmin implements Admin { } static private <C extends RetryingCallable<V> & Closeable, V> V executeCallable(C callable, - RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout) - throws IOException { + RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, + int rpcTimeout) throws IOException { RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout); try { return caller.callWithRetries(callable, operationTimeout); @@ -2905,6 +3008,7 @@ public class HBaseAdmin implements Admin { * Simple {@link Abortable}, throwing RuntimeException on abort. */ private static class ThrowableAbortable implements Abortable { + @Override public void abort(String why, Throwable e) { throw new RuntimeException(why, e); @@ -2922,16 +3026,13 @@ public class HBaseAdmin implements Admin { } @Override - public void updateConfiguration(final ServerName server) throws IOException { - final AdminService.BlockingInterface admin = this.connection.getAdmin(server); - Callable<Void> callable = new Callable<Void>() { - @Override - public Void call() throws Exception { - admin.updateConfiguration(null, UpdateConfigurationRequest.getDefaultInstance()); - return null; - } - }; - ProtobufUtil.call(callable); + public void updateConfiguration(ServerName server) throws IOException { + try { + this.connection.getAdmin(server).updateConfiguration(null, + UpdateConfigurationRequest.getDefaultInstance()); + } catch (ServiceException e) { + throw ProtobufUtil.getRemoteException(e); + } } @Override @@ -2944,7 +3045,8 @@ public class HBaseAdmin implements Admin { @Override public int getMasterInfoPort() throws IOException { // TODO: Fix! Reaching into internal implementation!!!! - ConnectionImplementation connection = (ConnectionImplementation)this.connection; + ConnectionImplementation connection = + (ConnectionImplementation)this.connection; ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher(); try { return MasterAddressTracker.getMasterInfoPort(zkw); @@ -2955,7 +3057,8 @@ public class HBaseAdmin implements Admin { private ServerName getMasterAddress() throws IOException { // TODO: Fix! Reaching into internal implementation!!!! - ConnectionImplementation connection = (ConnectionImplementation)this.connection; + ConnectionImplementation connection = + (ConnectionImplementation)this.connection; ZooKeeperKeepAliveConnection zkw = connection.getKeepAliveZooKeeperWatcher(); try { return MasterAddressTracker.getMasterAddress(zkw); @@ -2966,26 +3069,33 @@ public class HBaseAdmin implements Admin { @Override public long getLastMajorCompactionTimestamp(final TableName tableName) throws IOException { - return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) { + return executeCallable(new MasterCallable<Long>(getConnection()) { @Override - protected Long call(PayloadCarryingRpcController rpcController) throws Exception { + public Long call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); MajorCompactionTimestampRequest req = MajorCompactionTimestampRequest.newBuilder() .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); - return master.getLastMajorCompactionTimestamp(rpcController, req).getCompactionTimestamp(); + return master.getLastMajorCompactionTimestamp(controller, req).getCompactionTimestamp(); } }); } @Override public long getLastMajorCompactionTimestampForRegion(final byte[] regionName) throws IOException { - return executeCallable(new MasterCallable<Long>(getConnection(), getRpcControllerFactory()) { + return executeCallable(new MasterCallable<Long>(getConnection()) { @Override - protected Long call(PayloadCarryingRpcController rpcController) throws Exception { + public Long call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setCallTimeout(callTimeout); MajorCompactionTimestampForRegionRequest req = - MajorCompactionTimestampForRegionRequest.newBuilder().setRegion(RequestConverter + MajorCompactionTimestampForRegionRequest + .newBuilder() + .setRegion( + RequestConverter .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)).build(); - return master.getLastMajorCompactionTimestampForRegion(rpcController, req) + return master.getLastMajorCompactionTimestampForRegion(controller, req) .getCompactionTimestamp(); } }); @@ -3024,35 +3134,32 @@ public class HBaseAdmin implements Admin { @Override public void majorCompact(final TableName tableName, CompactType compactType) throws IOException, InterruptedException { - compact(tableName, null, true, compactType); + compact(tableName, null, true, compactType); } /** * {@inheritDoc} */ @Override - public CompactionState getCompactionState(final TableName tableName, + public CompactionState getCompactionState(TableName tableName, CompactType compactType) throws IOException { AdminProtos.GetRegionInfoResponse.CompactionState state = AdminProtos.GetRegionInfoResponse.CompactionState.NONE; checkTableExists(tableName); - final PayloadCarryingRpcController rpcController = rpcControllerFactory.newController(); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); switch (compactType) { case MOB: - final AdminProtos.AdminService.BlockingInterface masterAdmin = - this.connection.getAdmin(getMasterAddress()); - Callable<AdminProtos.GetRegionInfoResponse.CompactionState> callable = - new Callable<AdminProtos.GetRegionInfoResponse.CompactionState>() { - @Override - public AdminProtos.GetRegionInfoResponse.CompactionState call() throws Exception { - HRegionInfo info = getMobRegionInfo(tableName); - GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( - info.getRegionName(), true); - GetRegionInfoResponse response = masterAdmin.getRegionInfo(rpcController, request); - return response.getCompactionState(); - } - }; - state = ProtobufUtil.call(callable); + try { + ServerName master = getMasterAddress(); + HRegionInfo info = getMobRegionInfo(tableName); + GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( + info.getRegionName(), true); + GetRegionInfoResponse response = this.connection.getAdmin(master) + .getRegionInfo(controller, request); + state = response.getCompactionState(); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } break; case NORMAL: default: @@ -3066,23 +3173,15 @@ public class HBaseAdmin implements Admin { } else { pairs = MetaTableAccessor.getTableRegionsAndLocations(connection, tableName); } - for (Pair<HRegionInfo, ServerName> pair: pairs) { + for (Pair<HRegionInfo, ServerName> pair : pairs) { if (pair.getFirst().isOffline()) continue; if (pair.getSecond() == null) continue; - final ServerName sn = pair.getSecond(); - final byte [] regionName = pair.getFirst().getRegionName(); - final AdminService.BlockingInterface snAdmin = this.connection.getAdmin(sn); try { - Callable<GetRegionInfoResponse> regionInfoCallable = - new Callable<GetRegionInfoResponse>() { - @Override - public GetRegionInfoResponse call() throws Exception { - GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( - regionName, true); - return snAdmin.getRegionInfo(rpcController, request); - } - }; - GetRegionInfoResponse response = ProtobufUtil.call(regionInfoCallable); + ServerName sn = pair.getSecond(); + AdminService.BlockingInterface admin = this.connection.getAdmin(sn); + GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( + pair.getFirst().getRegionName(), true); + GetRegionInfoResponse response = admin.getRegionInfo(controller, request); switch (response.getCompactionState()) { case MAJOR_AND_MINOR: return CompactionState.MAJOR_AND_MINOR; @@ -3118,6 +3217,8 @@ public class HBaseAdmin implements Admin { } } } + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } finally { if (zookeeper != null) { zookeeper.close(); @@ -3182,11 +3283,12 @@ public class HBaseAdmin implements Admin { protected AbortProcedureResponse abortProcedureResult( final AbortProcedureRequest request) throws IOException { return admin.executeCallable(new MasterCallable<AbortProcedureResponse>( - admin.getConnection(), admin.getRpcControllerFactory()) { + admin.getConnection()) { @Override - protected AbortProcedureResponse call(PayloadCarryingRpcController rcpController) - throws Exception { - return master.abortProcedure(rcpController, request); + public AbortProcedureResponse call(int callTimeout) throws ServiceException { + PayloadCarryingRpcController controller = admin.getRpcControllerFactory().newCon
<TRUNCATED>