http://git-wip-us.apache.org/repos/asf/hbase/blob/2d781aa1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index c972b4c..8505241 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -40,10 +40,13 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.util.Timeout; import io.netty.util.TimerTask; + import java.util.stream.Stream; + import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.directory.api.util.OptionalComponentsMonitor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MetaTableAccessor; @@ -190,7 +193,6 @@ import org.apache.hadoop.hbase.util.Pair; * The implementation of AsyncAdmin. */ @InterfaceAudience.Private -@InterfaceStability.Evolving public class AsyncHBaseAdmin implements AsyncAdmin { public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; @@ -278,7 +280,6 @@ public class AsyncHBaseAdmin implements AsyncAdmin { return future; } - //TODO abstract call and adminCall into a single method. private <PREQ, PRESP, RESP> CompletableFuture<RESP> adminCall(HBaseRpcController controller, AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> rpcCall, Converter<RESP, PRESP> respConverter) { @@ -318,25 +319,26 @@ public class AsyncHBaseAdmin implements AsyncAdmin { CompletableFuture<Void> operate(TableName table); } - private CompletableFuture<TableDescriptor[]> batchTableOperations(Pattern pattern, + private CompletableFuture<List<TableDescriptor>> batchTableOperations(Pattern pattern, TableOperator operator, String operationType) { - CompletableFuture<TableDescriptor[]> future = new CompletableFuture<>(); + CompletableFuture<List<TableDescriptor>> future = new CompletableFuture<>(); List<TableDescriptor> failed = new LinkedList<>(); - listTables(pattern, false).whenComplete( + listTables(Optional.ofNullable(pattern), false).whenComplete( (tables, error) -> { if (error != null) { future.completeExceptionally(error); return; } - CompletableFuture[] futures = Arrays.stream(tables) - .map((table) -> operator.operate(table.getTableName()).whenComplete((v, ex) -> { - if (ex != null) { - LOG.info("Failed to " + operationType + " table " + table.getTableName(), ex); - failed.add(table); - } - })).<CompletableFuture> toArray(size -> new CompletableFuture[size]); + CompletableFuture[] futures = + tables.stream() + .map((table) -> operator.operate(table.getTableName()).whenComplete((v, ex) -> { + if (ex != null) { + LOG.info("Failed to " + operationType + " table " + table.getTableName(), ex); + failed.add(table); + } + })).<CompletableFuture> toArray(size -> new CompletableFuture[size]); CompletableFuture.allOf(futures).thenAccept((v) -> { - future.complete(failed.toArray(new TableDescriptor[failed.size()])); + future.complete(failed); }); }); return future; @@ -353,47 +355,28 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<TableDescriptor[]> listTables() { - return listTables((Pattern) null, false); - } - - @Override - public CompletableFuture<TableDescriptor[]> listTables(String regex, boolean includeSysTables) { - return listTables(Pattern.compile(regex), false); - } - - @Override - public CompletableFuture<TableDescriptor[]> listTables(Pattern pattern, boolean includeSysTables) { - return this - .<TableDescriptor[]>newMasterCaller() - .action( - (controller, stub) -> this - .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, TableDescriptor[]> call( - controller, stub, RequestConverter.buildGetTableDescriptorsRequest(pattern, - includeSysTables), (s, c, req, done) -> s.getTableDescriptors(c, req, done), ( - resp) -> ProtobufUtil.getTableDescriptorArray(resp))).call(); - } - - @Override - public CompletableFuture<TableName[]> listTableNames() { - return listTableNames((Pattern) null, false); - } - - @Override - public CompletableFuture<TableName[]> listTableNames(String regex, boolean includeSysTables) { - return listTableNames(Pattern.compile(regex), false); + public CompletableFuture<List<TableDescriptor>> listTables(Optional<Pattern> pattern, + boolean includeSysTables) { + return this.<List<TableDescriptor>> newMasterCaller() + .action((controller, stub) -> this + .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableDescriptor>> call( + controller, stub, + RequestConverter.buildGetTableDescriptorsRequest(pattern, includeSysTables), + (s, c, req, done) -> s.getTableDescriptors(c, req, done), + (resp) -> ProtobufUtil.toTableDescriptorList(resp))) + .call(); } @Override - public CompletableFuture<TableName[]> listTableNames(Pattern pattern, boolean includeSysTables) { - return this - .<TableName[]>newMasterCaller() - .action( - (controller, stub) -> this - .<GetTableNamesRequest, GetTableNamesResponse, TableName[]> call(controller, stub, - RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables), (s, c, req, - done) -> s.getTableNames(c, req, done), (resp) -> ProtobufUtil - .getTableNameArray(resp.getTableNamesList()))).call(); + public CompletableFuture<List<TableName>> listTableNames(Optional<Pattern> pattern, + boolean includeSysTables) { + return this.<List<TableName>> newMasterCaller() + .action((controller, stub) -> this + .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> call(controller, stub, + RequestConverter.buildGetTableNamesRequest(pattern, includeSysTables), + (s, c, req, done) -> s.getTableNames(c, req, done), + (resp) -> ProtobufUtil.toTableNameList(resp.getTableNamesList()))) + .call(); } @Override @@ -472,12 +455,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<TableDescriptor[]> deleteTables(String regex) { - return deleteTables(Pattern.compile(regex)); - } - - @Override - public CompletableFuture<TableDescriptor[]> deleteTables(Pattern pattern) { + public CompletableFuture<List<TableDescriptor>> deleteTables(Pattern pattern) { return batchTableOperations(pattern, (table) -> deleteTable(table), "DELETE"); } @@ -498,12 +476,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<TableDescriptor[]> enableTables(String regex) { - return enableTables(Pattern.compile(regex)); - } - - @Override - public CompletableFuture<TableDescriptor[]> enableTables(Pattern pattern) { + public CompletableFuture<List<TableDescriptor>> enableTables(Pattern pattern) { return batchTableOperations(pattern, (table) -> enableTable(table), "ENABLE"); } @@ -516,16 +489,10 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<TableDescriptor[]> disableTables(String regex) { - return disableTables(Pattern.compile(regex)); - } - - @Override - public CompletableFuture<TableDescriptor[]> disableTables(Pattern pattern) { + public CompletableFuture<List<TableDescriptor>> disableTables(Pattern pattern) { return batchTableOperations(pattern, (table) -> disableTable(table), "DISABLE"); } - @Override public CompletableFuture<Boolean> isTableEnabled(TableName tableName) { CompletableFuture<Boolean> future = new CompletableFuture<>(); @@ -577,7 +544,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin { if (!enabled) { future.complete(false); } else { - AsyncMetaTableAccessor.getTableRegionsAndLocations(metaTable, Optional.of(tableName)) + AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)) .whenComplete( (locations, error1) -> { if (error1 != null) { @@ -586,12 +553,12 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } int notDeployed = 0; int regionCount = 0; - for (Pair<HRegionInfo, ServerName> pair : locations) { - HRegionInfo info = pair.getFirst(); - if (pair.getSecond() == null) { + for (HRegionLocation location : locations) { + HRegionInfo info = location.getRegionInfo(); + if (location.getServerName() == null) { if (LOG.isDebugEnabled()) { LOG.debug("Table " + tableName + " has not deployed region " - + pair.getFirst().getEncodedName()); + + info.getEncodedName()); } notDeployed++; } else if (splitKeys != null @@ -706,21 +673,21 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<NamespaceDescriptor[]> listNamespaceDescriptors() { + public CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors() { return this - .<NamespaceDescriptor[]> newMasterCaller() + .<List<NamespaceDescriptor>> newMasterCaller() .action( (controller, stub) -> this - .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, NamespaceDescriptor[]> call( + .<ListNamespaceDescriptorsRequest, ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call( controller, stub, ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req, done) -> s.listNamespaceDescriptors(c, req, done), (resp) -> ProtobufUtil - .getNamespaceDescriptorArray(resp))).call(); + .toNamespaceDescriptorList(resp))).call(); } @Override - public CompletableFuture<Boolean> setBalancerRunning(final boolean on) { + public CompletableFuture<Boolean> setBalancerOn(final boolean on) { return this - .<Boolean>newMasterCaller() + .<Boolean> newMasterCaller() .action( (controller, stub) -> this .<SetBalancerRunningRequest, SetBalancerRunningResponse, Boolean> call(controller, @@ -730,24 +697,19 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<Boolean> balancer() { - return balancer(false); - } - - @Override - public CompletableFuture<Boolean> balancer(boolean force) { + public CompletableFuture<Boolean> balance(boolean forcible) { return this - .<Boolean>newMasterCaller() + .<Boolean> newMasterCaller() .action( (controller, stub) -> this.<BalanceRequest, BalanceResponse, Boolean> call(controller, - stub, RequestConverter.buildBalanceRequest(force), + stub, RequestConverter.buildBalanceRequest(forcible), (s, c, req, done) -> s.balance(c, req, done), (resp) -> resp.getBalancerRan())).call(); } @Override - public CompletableFuture<Boolean> isBalancerEnabled() { + public CompletableFuture<Boolean> isBalancerOn() { return this - .<Boolean>newMasterCaller() + .<Boolean> newMasterCaller() .action( (controller, stub) -> this.<IsBalancerEnabledRequest, IsBalancerEnabledResponse, Boolean> call( controller, stub, RequestConverter.buildIsBalancerEnabledRequest(), @@ -756,109 +718,38 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<Void> closeRegion(String regionname, String serverName) { - return closeRegion(Bytes.toBytes(regionname), serverName); - } - - @Override - public CompletableFuture<Void> closeRegion(byte[] regionName, String serverName) { - CompletableFuture<Void> future = new CompletableFuture<>(); - getRegion(regionName).whenComplete((p, err) -> { + public CompletableFuture<Boolean> closeRegion(byte[] regionName, Optional<ServerName> serverName) { + CompletableFuture<Boolean> future = new CompletableFuture<>(); + getRegionLocation(regionName).whenComplete((location, err) -> { if (err != null) { future.completeExceptionally(err); return; } - if (p == null || p.getFirst() == null) { - future.completeExceptionally(new UnknownRegionException(Bytes.toStringBinary(regionName))); - return; - } - if (serverName != null) { - closeRegion(ServerName.valueOf(serverName), p.getFirst()).whenComplete((p2, err2) -> { + ServerName server = serverName.isPresent() ? serverName.get() : location.getServerName(); + if (server == null) { + future.completeExceptionally(new NotServingRegionException(regionName)); + } else { + closeRegion(location.getRegionInfo(), server).whenComplete((result, err2) -> { if (err2 != null) { future.completeExceptionally(err2); - }else{ - future.complete(null); + } else { + future.complete(result); } }); - } else { - if (p.getSecond() == null) { - future.completeExceptionally(new NotServingRegionException(regionName)); - } else { - closeRegion(p.getSecond(), p.getFirst()).whenComplete((p2, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - }else{ - future.complete(null); - } - }); - } } }); return future; } - CompletableFuture<Pair<HRegionInfo, ServerName>> getRegion(byte[] regionName) { - if (regionName == null) { - return failedFuture(new IllegalArgumentException("Pass region name")); - } - CompletableFuture<Pair<HRegionInfo, ServerName>> future = new CompletableFuture<>(); - AsyncMetaTableAccessor.getRegion(metaTable, regionName).whenComplete( - (p, err) -> { - if (err != null) { - future.completeExceptionally(err); - } else if (p != null) { - future.complete(p); - } else { - metaTable.scanAll( - new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)) - .whenComplete((results, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - return; - } - String encodedName = Bytes.toString(regionName); - if (results != null && !results.isEmpty()) { - for (Result r : results) { - if (r.isEmpty() || MetaTableAccessor.getHRegionInfo(r) == null) continue; - RegionLocations rl = MetaTableAccessor.getRegionLocations(r); - if (rl != null) { - for (HRegionLocation h : rl.getRegionLocations()) { - if (h != null && encodedName.equals(h.getRegionInfo().getEncodedName())) { - future.complete(new Pair<>(h.getRegionInfo(), h.getServerName())); - return; - } - } - } - } - } - future.complete(null); - }); - } - }); - return future; - } - - @Override - public CompletableFuture<Boolean> closeRegionWithEncodedRegionName(String encodedRegionName, - String serverName) { + private CompletableFuture<Boolean> closeRegion(HRegionInfo hri, ServerName serverName) { return this .<Boolean> newAdminCaller() .action( (controller, stub) -> this.<CloseRegionRequest, CloseRegionResponse, Boolean> adminCall( controller, stub, - ProtobufUtil.buildCloseRegionRequest(ServerName.valueOf(serverName), encodedRegionName), - (s, c, req, done) -> s.closeRegion(controller, req, done), (resp) -> resp.getClosed())) - .serverName(ServerName.valueOf(serverName)).call(); - } - - @Override - public CompletableFuture<Void> closeRegion(ServerName sn, HRegionInfo hri) { - return this.<Void> newAdminCaller() - .action( - (controller, stub) -> this.<CloseRegionRequest, CloseRegionResponse, Void> adminCall( - controller, stub, ProtobufUtil.buildCloseRegionRequest(sn, hri.getRegionName()), - (s, c, req, done) -> s.closeRegion(controller, req, done), resp -> null)) - .serverName(sn).call(); + ProtobufUtil.buildCloseRegionRequest(serverName, hri.getRegionName()), + (s, c, req, done) -> s.closeRegion(controller, req, done), resp -> resp.getClosed())) + .serverName(serverName).call(); } @Override @@ -905,75 +796,54 @@ public class AsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture<Void> flushRegion(byte[] regionName) { CompletableFuture<Void> future = new CompletableFuture<>(); - getRegion(regionName).whenComplete((p, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - if (p == null || p.getFirst() == null) { - future.completeExceptionally( - new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName))); - return; - } - if (p.getSecond() == null) { - future.completeExceptionally( - new NoServerForRegionException(Bytes.toStringBinary(regionName))); - return; - } + getRegionLocation(regionName).whenComplete( + (location, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + ServerName serverName = location.getServerName(); + if (serverName == null) { + future.completeExceptionally(new NoServerForRegionException(Bytes + .toStringBinary(regionName))); + return; + } - this.<Void> newAdminCaller().serverName(p.getSecond()) - .action((controller, stub) -> this - .<FlushRegionRequest, FlushRegionResponse, Void> adminCall(controller, stub, - RequestConverter.buildFlushRegionRequest(p.getFirst().getRegionName()), - (s, c, req, done) -> s.flushRegion(c, req, done), resp -> null)) - .call().whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); - } - }); - }); + HRegionInfo regionInfo = location.getRegionInfo(); + this.<Void> newAdminCaller() + .serverName(serverName) + .action( + (controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall( + controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo + .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done), + resp -> null)).call().whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); return future; } @Override - public CompletableFuture<Void> compact(TableName tableName) { - return compact(tableName, null, false, CompactType.NORMAL); - } - - @Override - public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily) { + public CompletableFuture<Void> compact(TableName tableName, Optional<byte[]> columnFamily) { return compact(tableName, columnFamily, false, CompactType.NORMAL); } @Override - public CompletableFuture<Void> compactRegion(byte[] regionName) { - return compactRegion(regionName, null, false); - } - - @Override - public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) { + public CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily) { return compactRegion(regionName, columnFamily, false); } @Override - public CompletableFuture<Void> majorCompact(TableName tableName) { - return compact(tableName, null, true, CompactType.NORMAL); - } - - @Override - public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily) { + public CompletableFuture<Void> majorCompact(TableName tableName, Optional<byte[]> columnFamily) { return compact(tableName, columnFamily, true, CompactType.NORMAL); } @Override - public CompletableFuture<Void> majorCompactRegion(byte[] regionName) { - return compactRegion(regionName, null, true); - } - - @Override - public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) { + public CompletableFuture<Void> majorCompactRegion(byte[] regionName, Optional<byte[]> columnFamily) { return compactRegion(regionName, columnFamily, true); } @@ -996,7 +866,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } List<CompletableFuture<Void>> compactFutures = new ArrayList<>(); if (hRegionInfos != null) { - hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); + hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, Optional.empty()))); } CompletableFuture .allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])) @@ -1011,33 +881,30 @@ public class AsyncHBaseAdmin implements AsyncAdmin { return future; } - private CompletableFuture<Void> compactRegion(final byte[] regionName, final byte[] columnFamily, - final boolean major) { + private CompletableFuture<Void> compactRegion(byte[] regionName, Optional<byte[]> columnFamily, + boolean major) { CompletableFuture<Void> future = new CompletableFuture<>(); - getRegion(regionName).whenComplete((p, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - if (p == null || p.getFirst() == null) { - future.completeExceptionally( - new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName))); - return; - } - if (p.getSecond() == null) { - // found a region without region server assigned. - future.completeExceptionally( - new NoServerForRegionException(Bytes.toStringBinary(regionName))); - return; - } - compact(p.getSecond(), p.getFirst(), major, columnFamily).whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); + getRegionLocation(regionName).whenComplete( + (location, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + ServerName serverName = location.getServerName(); + if (serverName == null) { + future.completeExceptionally(new NoServerForRegionException(Bytes + .toStringBinary(regionName))); + return; } + compact(location.getServerName(), location.getRegionInfo(), major, columnFamily) + .whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); }); - }); return future; } @@ -1045,45 +912,34 @@ public class AsyncHBaseAdmin implements AsyncAdmin { * List all region locations for the specific table. */ private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) { - CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); if (TableName.META_TABLE_NAME.equals(tableName)) { + CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>(); // For meta table, we use zk to fetch all locations. AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration()); - registry.getMetaRegionLocation().whenComplete((metaRegions, err) -> { - if (err != null) { - future.completeExceptionally(err); - } else if (metaRegions == null || metaRegions.isEmpty() - || metaRegions.getDefaultRegionLocation() == null) { - future.completeExceptionally(new IOException("meta region does not found")); - } else { - future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); - } - // close the registry. - IOUtils.closeQuietly(registry); - }); + registry.getMetaRegionLocation().whenComplete( + (metaRegions, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else if (metaRegions == null || metaRegions.isEmpty() + || metaRegions.getDefaultRegionLocation() == null) { + future.completeExceptionally(new IOException("meta region does not found")); + } else { + future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); + } + // close the registry. + IOUtils.closeQuietly(registry); + }); + return future; } else { // For non-meta table, we fetch all locations by scanning hbase:meta table - AsyncMetaTableAccessor.getTableRegionsAndLocations(metaTable, Optional.of(tableName)) - .whenComplete((locations, err) -> { - if (err != null) { - future.completeExceptionally(err); - } else if (locations == null || locations.isEmpty()) { - future.complete(Collections.emptyList()); - } else { - List<HRegionLocation> regionLocations = locations.stream() - .map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond())) - .collect(Collectors.toList()); - future.complete(regionLocations); - } - }); + return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)); } - return future; } /** * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() */ - private CompletableFuture<Void> compact(final TableName tableName, final byte[] columnFamily, + private CompletableFuture<Void> compact(final TableName tableName, Optional<byte[]> columnFamily, final boolean major, CompactType compactType) { if (CompactType.MOB.equals(compactType)) { // TODO support MOB compact. @@ -1120,13 +976,15 @@ public class AsyncHBaseAdmin implements AsyncAdmin { * Compact the region at specific region server. */ private CompletableFuture<Void> compact(final ServerName sn, final HRegionInfo hri, - final boolean major, final byte[] family) { - return this.<Void> newAdminCaller().serverName(sn) - .action((controller, stub) -> this - .<CompactRegionRequest, CompactRegionResponse, Void> adminCall(controller, stub, - RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family), - (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null)) - .call(); + final boolean major, Optional<byte[]> columnFamily) { + return this + .<Void> newAdminCaller() + .serverName(sn) + .action( + (controller, stub) -> this.<CompactRegionRequest, CompactRegionResponse, Void> adminCall( + controller, stub, RequestConverter.buildCompactRegionRequest(hri.getRegionName(), + major, columnFamily), (s, c, req, done) -> s.compactRegion(c, req, done), + resp -> null)).call(); } private byte[] toEncodeRegionName(byte[] regionName) { @@ -1140,32 +998,29 @@ public class AsyncHBaseAdmin implements AsyncAdmin { private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName, CompletableFuture<TableName> result) { - getRegion(encodeRegionName).whenComplete((p, err) -> { - if (err != null) { - result.completeExceptionally(err); - return; - } - if (p == null) { - result.completeExceptionally(new UnknownRegionException( - "Can't invoke merge on unknown region " + Bytes.toStringBinary(encodeRegionName))); - return; - } - if (p.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { - result.completeExceptionally( - new IllegalArgumentException("Can't invoke merge on non-default regions directly")); - return; - } - if (!tableName.compareAndSet(null, p.getFirst().getTable())) { - if (!tableName.get().equals(p.getFirst().getTable())) { - // tables of this two region should be same. - result.completeExceptionally( - new IllegalArgumentException("Cannot merge regions from two different tables " - + tableName.get() + " and " + p.getFirst().getTable())); - } else { - result.complete(tableName.get()); + getRegionLocation(encodeRegionName).whenComplete( + (location, err) -> { + if (err != null) { + result.completeExceptionally(err); + return; } - } - }); + HRegionInfo regionInfo = location.getRegionInfo(); + if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { + result.completeExceptionally(new IllegalArgumentException( + "Can't invoke merge on non-default regions directly")); + return; + } + if (!tableName.compareAndSet(null, regionInfo.getTable())) { + if (!tableName.get().equals(regionInfo.getTable())) { + // tables of this two region should be same. + result.completeExceptionally(new IllegalArgumentException( + "Cannot merge regions from two different tables " + tableName.get() + " and " + + regionInfo.getTable())); + } else { + result.complete(tableName.get()); + } + } + }); } private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] encodeRegionNameA, @@ -1249,7 +1104,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin { if (hri == null || hri.isSplitParent() || hri.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) continue; - splitFutures.add(split(h.getServerName(), hri, null)); + splitFutures.add(split(h.getServerName(), hri, Optional.empty())); } } } @@ -1272,11 +1127,6 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<Void> splitRegion(byte[] regionName) { - return splitRegion(regionName, null); - } - - @Override public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) { CompletableFuture<Void> result = new CompletableFuture<>(); if (splitPoint == null) { @@ -1290,7 +1140,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin { result.completeExceptionally(new IllegalArgumentException( "Region does not found: rowKey=" + Bytes.toStringBinary(splitPoint))); } else { - splitRegion(loc.getRegionInfo().getRegionName(), splitPoint) + splitRegion(loc.getRegionInfo().getRegionName(), Optional.of(splitPoint)) .whenComplete((ret, err2) -> { if (err2 != null) { result.completeExceptionally(err2); @@ -1305,182 +1155,149 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint) { + public CompletableFuture<Void> splitRegion(byte[] regionName, Optional<byte[]> splitPoint) { CompletableFuture<Void> future = new CompletableFuture<>(); - getRegion(regionName).whenComplete((p, err) -> { - if (p == null) { - future.completeExceptionally( - new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName))); - return; - } - if (p.getFirst() != null && p.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { - future.completeExceptionally(new IllegalArgumentException("Can't split replicas directly. " - + "Replicas are auto-split when their primary is split.")); - return; - } - if (p.getSecond() == null) { - future.completeExceptionally( - new NoServerForRegionException(Bytes.toStringBinary(regionName))); - return; - } - split(p.getSecond(), p.getFirst(), splitPoint).whenComplete((ret, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else { - future.complete(ret); + getRegionLocation(regionName).whenComplete( + (location, err) -> { + HRegionInfo regionInfo = location.getRegionInfo(); + if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { + future.completeExceptionally(new IllegalArgumentException( + "Can't split replicas directly. " + + "Replicas are auto-split when their primary is split.")); + return; } + ServerName serverName = location.getServerName(); + if (serverName == null) { + future.completeExceptionally(new NoServerForRegionException(Bytes + .toStringBinary(regionName))); + return; + } + split(serverName, regionInfo, splitPoint).whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); }); - }); return future; } - @VisibleForTesting - public CompletableFuture<Void> split(final ServerName sn, final HRegionInfo hri, - byte[] splitPoint) { - if (hri.getStartKey() != null && splitPoint != null - && Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) { - return failedFuture( - new IllegalArgumentException("should not give a splitkey which equals to startkey!")); + private CompletableFuture<Void> split(final ServerName sn, final HRegionInfo hri, + Optional<byte[]> splitPoint) { + if (hri.getStartKey() != null && splitPoint.isPresent() + && Bytes.compareTo(hri.getStartKey(), splitPoint.get()) == 0) { + return failedFuture(new IllegalArgumentException( + "should not give a splitkey which equals to startkey!")); } - return this.<Void> newAdminCaller() + return this + .<Void> newAdminCaller() .action( (controller, stub) -> this.<SplitRegionRequest, SplitRegionResponse, Void> adminCall( - controller, stub, ProtobufUtil.buildSplitRegionRequest(hri.getRegionName(), splitPoint), + controller, stub, + ProtobufUtil.buildSplitRegionRequest(hri.getRegionName(), splitPoint), (s, c, req, done) -> s.splitRegion(controller, req, done), resp -> null)) .serverName(sn).call(); } - /** - * Turn regionNameOrEncodedRegionName into regionName, if region does not found, then it'll throw - * an IllegalArgumentException wrapped by a {@link CompletableFuture} - * @param regionNameOrEncodedRegionName - * @return - */ - CompletableFuture<byte[]> getRegionName(byte[] regionNameOrEncodedRegionName) { - CompletableFuture<byte[]> future = new CompletableFuture<>(); - if (Bytes - .equals(regionNameOrEncodedRegionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName()) - || Bytes.equals(regionNameOrEncodedRegionName, - HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) { - future.complete(HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); - return future; - } - - getRegion(regionNameOrEncodedRegionName).whenComplete((p, err) -> { - if (err != null) { - future.completeExceptionally(err); - } - if (p != null && p.getFirst() != null) { - future.complete(p.getFirst().getRegionName()); - } else { - future.completeExceptionally( - new IllegalArgumentException("Invalid region name or encoded region name: " - + Bytes.toStringBinary(regionNameOrEncodedRegionName))); - } - }); - return future; - } - @Override public CompletableFuture<Void> assign(byte[] regionName) { CompletableFuture<Void> future = new CompletableFuture<>(); - getRegionName(regionName).whenComplete((fullRegionName, err) -> { - if (err != null) { - future.completeExceptionally(err); - } else { + getRegionInfo(regionName).whenComplete( + (regionInfo, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } this.<Void> newMasterCaller() .action( ((controller, stub) -> this.<AssignRegionRequest, AssignRegionResponse, Void> call( - controller, stub, RequestConverter.buildAssignRegionRequest(fullRegionName), - (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))) - .call().whenComplete((ret, err2) -> { + controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo + .getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done), + resp -> null))).call().whenComplete((ret, err2) -> { if (err2 != null) { future.completeExceptionally(err2); } else { future.complete(ret); } }); - } - }); + }); return future; } @Override - public CompletableFuture<Void> unassign(byte[] regionName, boolean force) { + public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) { CompletableFuture<Void> future = new CompletableFuture<>(); - getRegionName(regionName).whenComplete((fullRegionName, err) -> { - if (err != null) { - future.completeExceptionally(err); - } else { + getRegionInfo(regionName).whenComplete( + (regionInfo, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } this.<Void> newMasterCaller() - .action(((controller, stub) -> this - .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub, - RequestConverter.buildUnassignRegionRequest(fullRegionName, force), - (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))) - .call().whenComplete((ret, err2) -> { + .action( + ((controller, stub) -> this + .<UnassignRegionRequest, UnassignRegionResponse, Void> call(controller, stub, + RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible), + (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> null))).call() + .whenComplete((ret, err2) -> { if (err2 != null) { future.completeExceptionally(err2); } else { future.complete(ret); } }); - } - }); + }); return future; } @Override public CompletableFuture<Void> offline(byte[] regionName) { CompletableFuture<Void> future = new CompletableFuture<>(); - getRegionName(regionName).whenComplete((fullRegionName, err) -> { - if (err != null) { - future.completeExceptionally(err); - } else { + getRegionInfo(regionName).whenComplete( + (regionInfo, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } this.<Void> newMasterCaller() .action( ((controller, stub) -> this.<OfflineRegionRequest, OfflineRegionResponse, Void> call( - controller, stub, RequestConverter.buildOfflineRegionRequest(fullRegionName), - (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> null))) - .call().whenComplete((ret, err2) -> { + controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo + .getRegionName()), (s, c, req, done) -> s.offlineRegion(c, req, done), + resp -> null))).call().whenComplete((ret, err2) -> { if (err2 != null) { future.completeExceptionally(err2); } else { future.complete(ret); } }); - } - }); + }); return future; } @Override - public CompletableFuture<Void> move(byte[] regionName, byte[] destServerName) { + public CompletableFuture<Void> move(byte[] regionName, Optional<ServerName> destServerName) { CompletableFuture<Void> future = new CompletableFuture<>(); - getRegionName(regionName).whenComplete((fullRegionName, err) -> { - if (err != null) { - future.completeExceptionally(err); - } else { - final MoveRegionRequest request; - try { - request = RequestConverter.buildMoveRegionRequest( - Bytes.toBytes(HRegionInfo.encodeRegionName(fullRegionName)), destServerName); - } catch (DeserializationException e) { - future.completeExceptionally(e); + getRegionInfo(regionName).whenComplete( + (regionInfo, err) -> { + if (err != null) { + future.completeExceptionally(err); return; } this.<Void> newMasterCaller() - .action((controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call( - controller, stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), - resp -> null)) - .call().whenComplete((ret, err2) -> { + .action( + (controller, stub) -> this.<MoveRegionRequest, MoveRegionResponse, Void> call( + controller, stub, RequestConverter.buildMoveRegionRequest( + regionInfo.getEncodedNameAsBytes(), destServerName), (s, c, req, done) -> s + .moveRegion(c, req, done), resp -> null)).call().whenComplete((ret, err2) -> { if (err2 != null) { future.completeExceptionally(err2); } else { future.complete(ret); } }); - } - }); + }); return future; } @@ -1644,17 +1461,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers() { - return listReplicationPeers((Pattern) null); - } - - @Override - public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(String regex) { - return listReplicationPeers(Pattern.compile(regex)); - } - - @Override - public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Pattern pattern) { + public CompletableFuture<List<ReplicationPeerDescription>> listReplicationPeers(Optional<Pattern> pattern) { return this .<List<ReplicationPeerDescription>> newMasterCaller() .action( @@ -1676,18 +1483,17 @@ public class AsyncHBaseAdmin implements AsyncAdmin { (tables, error) -> { if (!completeExceptionally(future, error)) { List<TableCFs> replicatedTableCFs = new ArrayList<>(); - Arrays.asList(tables).forEach( - table -> { - Map<String, Integer> cfs = new HashMap<>(); - Stream.of(table.getColumnFamilies()) - .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) - .forEach(column -> { - cfs.put(column.getNameAsString(), column.getScope()); - }); - if (!cfs.isEmpty()) { - replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); - } - }); + tables.forEach(table -> { + Map<String, Integer> cfs = new HashMap<>(); + Stream.of(table.getColumnFamilies()) + .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) + .forEach(column -> { + cfs.put(column.getNameAsString(), column.getScope()); + }); + if (!cfs.isEmpty()) { + replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); + } + }); future.complete(replicatedTableCFs); } }); @@ -1707,8 +1513,8 @@ public class AsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) { - SnapshotProtos.SnapshotDescription snapshot = - ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); + SnapshotProtos.SnapshotDescription snapshot = ProtobufUtil + .createHBaseProtosSnapshotDesc(snapshotDesc); try { ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot); } catch (IllegalArgumentException e) { @@ -1717,10 +1523,10 @@ public class AsyncHBaseAdmin implements AsyncAdmin { CompletableFuture<Void> future = new CompletableFuture<>(); final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build(); this.<Long> newMasterCaller() - .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call( - controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, done), - resp -> resp.getExpectedTimeout())) - .call().whenComplete((expectedTimeout, err) -> { + .action( + (controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller, + stub, request, (s, c, req, done) -> s.snapshot(c, req, done), + resp -> resp.getExpectedTimeout())).call().whenComplete((expectedTimeout, err) -> { if (err != null) { future.completeExceptionally(err); return; @@ -1734,25 +1540,24 @@ public class AsyncHBaseAdmin implements AsyncAdmin { @Override public void run(Timeout timeout) throws Exception { if (EnvironmentEdgeManager.currentTime() < endTime) { - isSnapshotFinished(snapshotDesc).whenComplete((done, err) -> { - if (err != null) { - future.completeExceptionally(err); + isSnapshotFinished(snapshotDesc).whenComplete((done, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); } else if (done) { future.complete(null); } else { // retry again after pauseTime. - long pauseTime = ConnectionUtils - .getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); - pauseTime = Math.min(pauseTime, maxPauseTime); - AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime, - TimeUnit.MILLISECONDS); - } - }); + long pauseTime = ConnectionUtils.getPauseTime( + TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries); + pauseTime = Math.min(pauseTime, maxPauseTime); + AsyncConnectionImpl.RETRY_TIMER + .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS); + } + } ); } else { - future.completeExceptionally(new SnapshotCreationException( - "Snapshot '" + snapshot.getName() + "' wasn't completed in expectedTime:" - + expectedTimeout + " ms", - snapshotDesc)); + future.completeExceptionally(new SnapshotCreationException("Snapshot '" + + snapshot.getName() + "' wasn't completed in expectedTime:" + expectedTimeout + + " ms", snapshotDesc)); } } }; @@ -1763,13 +1568,15 @@ public class AsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) { - return this.<Boolean> newMasterCaller() - .action((controller, stub) -> this - .<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call(controller, stub, - IsSnapshotDoneRequest.newBuilder() - .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), - (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())) - .call(); + return this + .<Boolean> newMasterCaller() + .action( + (controller, stub) -> this.<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> call( + controller, + stub, + IsSnapshotDoneRequest.newBuilder() + .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c, + req, done) -> s.isSnapshotDone(c, req, done), resp -> resp.getDone())).call(); } @Override @@ -1780,109 +1587,110 @@ public class AsyncHBaseAdmin implements AsyncAdmin { return restoreSnapshot(snapshotName, takeFailSafeSnapshot); } - private CompletableFuture<Void> restoreSnapshotWithFailSafe(String snapshotName, - TableName tableName, boolean takeFailSafeSnapshot) { + @Override + public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean takeFailSafeSnapshot) { + CompletableFuture<Void> future = new CompletableFuture<>(); + listSnapshots(Pattern.compile(snapshotName)).whenComplete( + (snapshotDescriptions, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + TableName tableName = null; + if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { + for (SnapshotDescription snap : snapshotDescriptions) { + if (snap.getName().equals(snapshotName)) { + tableName = snap.getTableName(); + break; + } + } + } + if (tableName == null) { + future.completeExceptionally(new RestoreSnapshotException( + "Unable to find the table name for snapshot=" + snapshotName)); + return; + } + final TableName finalTableName = tableName; + tableExists(finalTableName) + .whenComplete((exists, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else if (!exists) { + // if table does not exist, then just clone snapshot into new table. + completeConditionalOnFuture(future, + internalRestoreSnapshot(snapshotName, finalTableName)); + } else { + isTableDisabled(finalTableName).whenComplete( + (disabled, err4) -> { + if (err4 != null) { + future.completeExceptionally(err4); + } else if (!disabled) { + future.completeExceptionally(new TableNotDisabledException(finalTableName)); + } else { + completeConditionalOnFuture(future, + restoreSnapshot(snapshotName, finalTableName, takeFailSafeSnapshot)); + } + }); + } + } ); + }); + return future; + } + + private CompletableFuture<Void> restoreSnapshot(String snapshotName, TableName tableName, + boolean takeFailSafeSnapshot) { if (takeFailSafeSnapshot) { CompletableFuture<Void> future = new CompletableFuture<>(); // Step.1 Take a snapshot of the current state - String failSafeSnapshotSnapshotNameFormat = - this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, - HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); - final String failSafeSnapshotSnapshotName = - failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", snapshotName) - .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) - .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); + String failSafeSnapshotSnapshotNameFormat = this.connection.getConfiguration().get( + HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME, + HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME); + final String failSafeSnapshotSnapshotName = failSafeSnapshotSnapshotNameFormat + .replace("{snapshot.name}", snapshotName) + .replace("{table.name}", tableName.toString().replace(TableName.NAMESPACE_DELIM, '.')) + .replace("{restore.timestamp}", String.valueOf(EnvironmentEdgeManager.currentTime())); LOG.info("Taking restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); snapshot(failSafeSnapshotSnapshotName, tableName).whenComplete((ret, err) -> { if (err != null) { future.completeExceptionally(err); } else { // Step.2 Restore snapshot - internalRestoreSnapshot(snapshotName, tableName).whenComplete((void2, err2) -> { - if (err2 != null) { - // Step.3.a Something went wrong during the restore and try to rollback. - internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName) - .whenComplete((void3, err3) -> { - if (err3 != null) { - future.completeExceptionally(err3); - } else { - String msg = - "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" - + failSafeSnapshotSnapshotName + " succeeded."; - future.completeExceptionally(new RestoreSnapshotException(msg)); - } - }); - } else { - // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. - LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); - deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete((ret3, err3) -> { - if (err3 != null) { - LOG.error( - "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, - err3); - future.completeExceptionally(err3); - } else { - future.complete(ret3); - } - }); - } - }); + internalRestoreSnapshot(snapshotName, tableName).whenComplete((void2, err2) -> { + if (err2 != null) { + // Step.3.a Something went wrong during the restore and try to rollback. + internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName).whenComplete( + (void3, err3) -> { + if (err3 != null) { + future.completeExceptionally(err3); + } else { + String msg = "Restore snapshot=" + snapshotName + " failed. Rollback to snapshot=" + + failSafeSnapshotSnapshotName + " succeeded."; + future.completeExceptionally(new RestoreSnapshotException(msg)); + } + }); + } else { + // Step.3.b If the restore is succeeded, delete the pre-restore snapshot. + LOG.info("Deleting restore-failsafe snapshot: " + failSafeSnapshotSnapshotName); + deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete( + (ret3, err3) -> { + if (err3 != null) { + LOG.error( + "Unable to remove the failsafe snapshot: " + failSafeSnapshotSnapshotName, err3); + future.completeExceptionally(err3); + } else { + future.complete(ret3); + } + }); } - }); + } ); + } + } ); return future; } else { return internalRestoreSnapshot(snapshotName, tableName); } } - @Override - public CompletableFuture<Void> restoreSnapshot(String snapshotName, - boolean takeFailSafeSnapshot) { - CompletableFuture<Void> future = new CompletableFuture<>(); - listSnapshots(snapshotName).whenComplete((snapshotDescriptions, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - TableName tableName = null; - if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) { - for (SnapshotDescription snap : snapshotDescriptions) { - if (snap.getName().equals(snapshotName)) { - tableName = snap.getTableName(); - break; - } - } - } - if (tableName == null) { - future.completeExceptionally(new RestoreSnapshotException( - "Unable to find the table name for snapshot=" + snapshotName)); - return; - } - final TableName finalTableName = tableName; - tableExists(finalTableName).whenComplete((exists, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); - } else if (!exists) { - // if table does not exist, then just clone snapshot into new table. - completeConditionalOnFuture(future, - internalRestoreSnapshot(snapshotName, finalTableName)); - } else { - isTableDisabled(finalTableName).whenComplete((disabled, err4) -> { - if (err4 != null) { - future.completeExceptionally(err4); - } else if (!disabled) { - future.completeExceptionally(new TableNotDisabledException(finalTableName)); - } else { - completeConditionalOnFuture(future, - restoreSnapshotWithFailSafe(snapshotName, finalTableName, takeFailSafeSnapshot)); - } - }); - } - }); - }); - return future; - } - private <T> void completeConditionalOnFuture(CompletableFuture<T> dependentFuture, CompletableFuture<T> parentFuture) { parentFuture.whenComplete((res, err) -> { @@ -1909,8 +1717,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin { return future; } - private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, - TableName tableName) { + private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, TableName tableName) { SnapshotProtos.SnapshotDescription snapshot = SnapshotProtos.SnapshotDescription.newBuilder() .setName(snapshotName).setTable(tableName.getNameAsString()).build(); try { @@ -1918,86 +1725,78 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } catch (IllegalArgumentException e) { return failedFuture(e); } - return waitProcedureResult( - this.<Long> newMasterCaller() - .action((controller, stub) -> this - .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub, - RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot) - .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(), - (s, c, req, done) -> s.restoreSnapshot(c, req, done), - (resp) -> resp.getProcId())) - .call()); + return waitProcedureResult(this + .<Long> newMasterCaller() + .action( + (controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call( + controller, stub, RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot) + .setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(), (s, c, req, + done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())).call()); } @Override public CompletableFuture<List<SnapshotDescription>> listSnapshots() { - return this.<List<SnapshotDescription>> newMasterCaller() - .action((controller, stub) -> this - .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>> call( - controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(), - (s, c, req, done) -> s.getCompletedSnapshots(c, req, done), - resp -> resp.getSnapshotsList().stream().map(ProtobufUtil::createSnapshotDesc) - .collect(Collectors.toList()))) + return this + .<List<SnapshotDescription>> newMasterCaller() + .action( + (controller, stub) -> this + .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, List<SnapshotDescription>> call( + controller, stub, GetCompletedSnapshotsRequest.newBuilder().build(), (s, c, req, + done) -> s.getCompletedSnapshots(c, req, done), resp -> resp.getSnapshotsList() + .stream().map(ProtobufUtil::createSnapshotDesc).collect(Collectors.toList()))) .call(); } @Override - public CompletableFuture<List<SnapshotDescription>> listSnapshots(String regex) { - return listSnapshots(Pattern.compile(regex)); - } - - @Override public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern) { CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); - listSnapshots().whenComplete((snapshotDescList, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - if (snapshotDescList == null || snapshotDescList.isEmpty()) { - future.complete(Collections.emptyList()); - return; - } - future.complete(snapshotDescList.stream() - .filter(snap -> pattern.matcher(snap.getName()).matches()).collect(Collectors.toList())); - }); + listSnapshots() + .whenComplete( + (snapshotDescList, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (snapshotDescList == null || snapshotDescList.isEmpty()) { + future.complete(Collections.emptyList()); + return; + } + future.complete(snapshotDescList.stream() + .filter(snap -> pattern.matcher(snap.getName()).matches()) + .collect(Collectors.toList())); + }); return future; } @Override - public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(String tableNameRegex, - String snapshotNameRegex) { - return listTableSnapshots(Pattern.compile(tableNameRegex), Pattern.compile(snapshotNameRegex)); - } - - @Override public CompletableFuture<List<SnapshotDescription>> listTableSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) { CompletableFuture<List<SnapshotDescription>> future = new CompletableFuture<>(); - listTableNames(tableNamePattern, false).whenComplete((tableNames, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - if (tableNames == null || tableNames.length <= 0) { - future.complete(Collections.emptyList()); - return; - } - List<TableName> tableNameList = Arrays.asList(tableNames); - listSnapshots(snapshotNamePattern).whenComplete((snapshotDescList, err2) -> { - if (err2 != null) { - future.completeExceptionally(err2); + listTableNames(Optional.ofNullable(tableNamePattern), false).whenComplete( + (tableNames, err) -> { + if (err != null) { + future.completeExceptionally(err); return; } - if (snapshotDescList == null || snapshotDescList.isEmpty()) { + if (tableNames == null || tableNames.size() <= 0) { future.complete(Collections.emptyList()); return; } - future.complete(snapshotDescList.stream() - .filter(snap -> (snap != null && tableNameList.contains(snap.getTableName()))) - .collect(Collectors.toList())); + listSnapshots(snapshotNamePattern).whenComplete( + (snapshotDescList, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + return; + } + if (snapshotDescList == null || snapshotDescList.isEmpty()) { + future.complete(Collections.emptyList()); + return; + } + future.complete(snapshotDescList.stream() + .filter(snap -> (snap != null && tableNames.contains(snap.getTableName()))) + .collect(Collectors.toList())); + }); }); - }); return future; } @@ -2007,47 +1806,46 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<Void> deleteSnapshots(String regex) { - return deleteSnapshots(Pattern.compile(regex)); - } - - @Override public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) { return deleteTableSnapshots(null, snapshotNamePattern); } @Override - public CompletableFuture<Void> deleteTableSnapshots(String tableNameRegex, - String snapshotNameRegex) { - return deleteTableSnapshots(Pattern.compile(tableNameRegex), - Pattern.compile(snapshotNameRegex)); - } - - @Override public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern, Pattern snapshotNamePattern) { CompletableFuture<Void> future = new CompletableFuture<>(); - listTableSnapshots(tableNamePattern, snapshotNamePattern) - .whenComplete(((snapshotDescriptions, err) -> { - if (err != null) { - future.completeExceptionally(err); - return; - } - if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { - future.complete(null); - return; - } - List<CompletableFuture<Void>> deleteSnapshotFutures = new ArrayList<>(); - snapshotDescriptions - .forEach(snapDesc -> deleteSnapshotFutures.add(internalDeleteSnapshot(snapDesc))); - CompletableFuture - .allOf(deleteSnapshotFutures - .toArray(new CompletableFuture<?>[deleteSnapshotFutures.size()])) - .thenAccept(v -> future.complete(v)); - })); + listTableSnapshots(tableNamePattern, snapshotNamePattern).whenComplete( + ((snapshotDescriptions, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) { + future.complete(null); + return; + } + List<CompletableFuture<Void>> deleteSnapshotFutures = new ArrayList<>(); + snapshotDescriptions.forEach(snapDesc -> deleteSnapshotFutures + .add(internalDeleteSnapshot(snapDesc))); + CompletableFuture.allOf( + deleteSnapshotFutures.toArray(new CompletableFuture<?>[deleteSnapshotFutures.size()])) + .thenAccept(v -> future.complete(v)); + })); return future; } + private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { + return this + .<Void> newMasterCaller() + .action( + (controller, stub) -> this.<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call( + controller, + stub, + DeleteSnapshotRequest.newBuilder() + .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, c, + req, done) -> s.deleteSnapshot(c, req, done), resp -> null)).call(); + } + @Override public CompletableFuture<Void> execProcedure(String signature, String instance, Map<String, String> props) { @@ -2072,9 +1870,9 @@ public class AsyncHBaseAdmin implements AsyncAdmin { @Override public void run(Timeout timeout) throws Exception { if (EnvironmentEdgeManager.currentTime() < endTime) { - isProcedureFinished(signature, instance, props).whenComplete((done, err) -> { - if (err != null) { - future.completeExceptionally(err); + isProcedureFinished(signature, instance, props).whenComplete((done, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); return; } if (done) { @@ -2137,24 +1935,87 @@ public class AsyncHBaseAdmin implements AsyncAdmin { } @Override - public CompletableFuture<ProcedureInfo[]> listProcedures() { - return this.<ProcedureInfo[]> newMasterCaller() - .action((controller, stub) -> this - .<ListProceduresRequest, ListProceduresResponse, ProcedureInfo[]> call(controller, stub, - ListProceduresRequest.newBuilder().build(), - (s, c, req, done) -> s.listProcedures(c, req, done), resp -> resp.getProcedureList() - .stream().map(ProtobufUtil::toProcedureInfo).toArray(ProcedureInfo[]::new))) - .call(); + public CompletableFuture<List<ProcedureInfo>> listProcedures() { + return this + .<List<ProcedureInfo>> newMasterCaller() + .action( + (controller, stub) -> this + .<ListProceduresRequest, ListProceduresResponse, List<ProcedureInfo>> call( + controller, stub, ListProceduresRequest.newBuilder().build(), + (s, c, req, done) -> s.listProcedures(c, req, done), + resp -> resp.getProcedureList().stream().map(ProtobufUtil::toProcedureInfo) + .collect(Collectors.toList()))).call(); } - private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription snapshot) { - return this.<Void> newMasterCaller() - .action((controller, stub) -> this - .<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> call(controller, stub, - DeleteSnapshotRequest.newBuilder() - .setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), - (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> null)) - .call(); + /** + * Get the region location for the passed region name. The region name may be a full region name + * or encoded region name. If the region does not found, then it'll throw an + * UnknownRegionException wrapped by a {@link CompletableFuture} + * @param regionNameOrEncodedRegionName + * @return region location, wrapped by a {@link CompletableFuture} + */ + @VisibleForTesting + CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedRegionName) { + if (regionNameOrEncodedRegionName == null) { + return failedFuture(new IllegalArgumentException("Passed region name can't be null")); + } + try { + CompletableFuture<Optional<HRegionLocation>> future; + if (HRegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { + future = AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable, + regionNameOrEncodedRegionName); + } else { + future = AsyncMetaTableAccessor.getRegionLocation(metaTable, regionNameOrEncodedRegionName); + } + + CompletableFuture<HRegionLocation> returnedFuture = new CompletableFuture<>(); + future.whenComplete((location, err) -> { + if (err != null) { + returnedFuture.completeExceptionally(err); + return; + } + if (!location.isPresent() || location.get().getRegionInfo() == null) { + returnedFuture.completeExceptionally(new UnknownRegionException( + "Invalid region name or encoded region name: " + + Bytes.toStringBinary(regionNameOrEncodedRegionName))); + } else { + returnedFuture.complete(location.get()); + } + }); + return returnedFuture; + } catch (IOException e) { + return failedFuture(e); + } + } + + /** + * Get the region info for the passed region name. The region name may be a full region name or + * encoded region name. If the region does not found, then it'll throw an UnknownRegionException + * wrapped by a {@link CompletableFuture} + * @param regionNameOrEncodedRegionName + * @return region info, wrapped by a {@link CompletableFuture} + */ + private CompletableFuture<HRegionInfo> getRegionInfo(byte[] regionNameOrEncodedRegionName) { + if (regionNameOrEncodedRegionName == null) { + return failedFuture(new IllegalArgumentException("Passed region name can't be null")); + } + + if (Bytes.equals(regionNameOrEncodedRegionName, + HRegionInfo.FIRST_META_REGIONINFO.getRegionName()) + || Bytes.equals(regionNameOrEncodedRegionName, + HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) { + return CompletableFuture.completedFuture(HRegionInfo.FIRST_META_REGIONINFO); + } + + CompletableFuture<HRegionInfo> future = new CompletableFuture<>(); + getRegionLocation(regionNameOrEncodedRegionName).whenComplete((location, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else { + future.complete(location.getRegionInfo()); + } + }); + return future; } private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/2d781aa1/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index b196911..5f8924f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -33,8 +33,10 @@ import java.util.Locale; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableSet; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -414,18 +416,14 @@ public final class ProtobufUtil { } /** - * Get NamespaceDescriptor[] from ListNamespaceDescriptorsResponse protobuf + * Get a list of NamespaceDescriptor from ListNamespaceDescriptorsResponse protobuf * @param proto the ListNamespaceDescriptorsResponse - * @return NamespaceDescriptor[] + * @return a list of NamespaceDescriptor */ - public static NamespaceDescriptor[] getNamespaceDescriptorArray( + public static List<NamespaceDescriptor> toNamespaceDescriptorList( ListNamespaceDescriptorsResponse proto) { - List<HBaseProtos.NamespaceDescriptor> list = proto.getNamespaceDescriptorList(); - NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()]; - for (int i = 0; i < list.size(); i++) { - res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i)); - } - return res; + return proto.getNamespaceDescriptorList().stream().map(ProtobufUtil::toNamespaceDescriptor) + .collect(Collectors.toList()); } /** @@ -433,7 +431,7 @@ public final class ProtobufUtil { * * @param proto the GetTableDescriptorsResponse * @return a immutable HTableDescriptor array - * @deprecated Use {@link #getTableDescriptorArray} after removing the HTableDescriptor + * @deprecated Use {@link #toTableDescriptorList} after removing the HTableDescriptor */ @Deprecated public static HTableDescriptor[] getHTableDescriptorArray(GetTableDescriptorsResponse proto) { @@ -447,18 +445,17 @@ public final class ProtobufUtil { } /** - * Get TableDescriptor[] from GetTableDescriptorsResponse protobuf + * Get a list of TableDescriptor from GetTableDescriptorsResponse protobuf * * @param proto the GetTableDescriptorsResponse - * @return TableDescriptor[] + * @return a list of TableDescriptor */ - public static TableDescriptor[] getTableDescriptorArray(GetTableDescriptorsResponse proto) { - if (proto == null) return new TableDescriptor[0]; - return proto.getTableSchemaList() - .stream() - .map(ProtobufUtil::convertToTableDesc) - .toArray(size -> new TableDescriptor[size]); + public static List<TableDescriptor> toTableDescriptorList(GetTableDescriptorsResponse proto) { + if (proto == null) return new ArrayList<>(); + return proto.getTableSchemaList().stream().map(ProtobufUtil::convertToTableDesc) + .collect(Collectors.toList()); } + /** * get the split keys in form "byte [][]" from a CreateTableRequest proto * @@ -2398,6 +2395,13 @@ public final class ProtobufUtil { .setQualifier(UnsafeByteOperations.unsafeWrap(tableName.getQualifier())).build(); } + public static List<TableName> toTableNameList(List<HBaseProtos.TableName> tableNamesList) { + if (tableNamesList == null) { + return new ArrayList<>(); + } + return tableNamesList.stream().map(ProtobufUtil::toTableName).collect(Collectors.toList()); + } + public static TableName[] getTableNameArray(List<HBaseProtos.TableName> tableNamesList) { if (tableNamesList == null) { return new TableName[0]; @@ -3345,23 +3349,33 @@ public final class ProtobufUtil { } /** - * Create a SplitRegionRequest for a given region name - * - * @param regionName the name of the region to split - * @param splitPoint the split point - * @return a SplitRegionRequest - */ - public static SplitRegionRequest buildSplitRegionRequest( - final byte[] regionName, final byte[] splitPoint) { - SplitRegionRequest.Builder builder = SplitRegionRequest.newBuilder(); - RegionSpecifier region = RequestConverter.buildRegionSpecifier( - RegionSpecifierType.REGION_NAME, regionName); - builder.setRegion(region); - if (splitPoint != null) { - builder.setSplitPoint(UnsafeByteOperations.unsafeWrap(splitPoint)); - } - return builder.build(); - } + * Create a SplitRegionRequest for a given region name + * @param regionName the name of the region to split + * @param splitPoint the split point + * @return a SplitRegionRequest + * @deprecated Use {@link #buildSplitRegionRequest(byte[], Optional)} instead. + */ + @Deprecated + public static SplitRegionRequest buildSplitRegionRequest(final byte[] regionName, + final byte[] splitPoint) { + return buildSplitRegionRequest(regionName, Optional.ofNullable(splitPoint)); + } + + /** + * Create a SplitRegionRequest for a given region name + * @param regionName the name of the region to split + * @param splitPoint the split point + * @return a SplitRegionRequest + */ + public static SplitRegionRequest buildSplitRegionRequest(byte[] regionName, + Optional<byte[]> splitPoint) { + SplitRegionRequest.Builder builder = SplitRegionRequest.newBuilder(); + RegionSpecifier region = + RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); + builder.setRegion(region); + splitPoint.ifPresent(sp -> builder.setSplitPoint(UnsafeByteOperations.unsafeWrap(sp))); + return builder.build(); + } public static ProcedureDescription buildProcedureDescription(String signature, String instance, Map<String, String> props) { http://git-wip-us.apache.org/repos/asf/hbase/blob/2d781aa1/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 67f7d0a..39ae6a5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.shaded.protobuf; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; @@ -919,25 +920,37 @@ public final class RequestConverter { builder.setRegionInfo(HRegionInfo.convert(regionInfo)); return builder.build(); } - /** - * Create a CompactRegionRequest for a given region name - * - * @param regionName the name of the region to get info - * @param major indicator if it is a major compaction - * @return a CompactRegionRequest - */ - public static CompactRegionRequest buildCompactRegionRequest( - final byte[] regionName, final boolean major, final byte [] family) { - CompactRegionRequest.Builder builder = CompactRegionRequest.newBuilder(); - RegionSpecifier region = buildRegionSpecifier( - RegionSpecifierType.REGION_NAME, regionName); - builder.setRegion(region); - builder.setMajor(major); - if (family != null) { - builder.setFamily(UnsafeByteOperations.unsafeWrap(family)); - } - return builder.build(); - } + + /** + * Create a CompactRegionRequest for a given region name + * @param regionName the name of the region to get info + * @param major indicator if it is a major compaction + * @param columnFamily + * @return a CompactRegionRequest + * @deprecated Use {@link #buildCompactRegionRequest(byte[], boolean, Optional)} instead. + */ + @Deprecated + public static CompactRegionRequest buildCompactRegionRequest(byte[] regionName, boolean major, + byte[] columnFamily) { + return buildCompactRegionRequest(regionName, major, Optional.ofNullable(columnFamily)); + } + + /** + * Create a CompactRegionRequest for a given region name + * @param regionName the name of the region to get info + * @param major indicator if it is a major compaction + * @param columnFamily + * @return a CompactRegionRequest + */ + public static CompactRegionRequest buildCompactRegionRequest(byte[] regionName, boolean major, + Optional<byte[]> columnFamily) { + CompactRegionRequest.Builder builder = CompactRegionRequest.newBuilder(); + RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); + builder.setRegion(region); + builder.setMajor(major); + columnFamily.ifPresent(family -> builder.setFamily(UnsafeByteOperations.unsafeWrap(family))); + return builder.build(); + } /** * @see {@link #buildRollWALWriterRequest()} @@ -1084,12 +1097,13 @@ public final class RequestConverter { /** * Create a protocol buffer MoveRegionRequest - * * @param encodedRegionName * @param destServerName * @return A MoveRegionRequest * @throws DeserializationException + * @deprecated Use {@link #buildMoveRegionRequest(byte[], Optional)} instead. */ + @Deprecated public static MoveRegionRequest buildMoveRegionRequest( final byte [] encodedRegionName, final byte [] destServerName) throws DeserializationException { @@ -1103,6 +1117,22 @@ public final class RequestConverter { return builder.build(); } + /** + * Create a protocol buffer MoveRegionRequest + * @param encodedRegionName + * @param destServerName + * @return A MoveRegionRequest + */ + public static MoveRegionRequest buildMoveRegionRequest(byte[] encodedRegionName, + Optional<ServerName> destServerName) { + MoveRegionRequest.Builder builder = MoveRegionRequest.newBuilder(); + builder.setRegion(buildRegionSpecifier(RegionSpecifierType.ENCODED_REGION_NAME, + encodedRegionName)); + destServerName.ifPresent(serverName -> builder.setDestServerName(ProtobufUtil + .toServerName(serverName))); + return builder.build(); + } + public static MergeTableRegionsRequest buildMergeTableRegionsRequest( final byte[][] encodedNameOfdaughaterRegions, final boolean forcible, @@ -1310,11 +1340,25 @@ public final class RequestConverter { * @param pattern The compiled regular expression to match against * @param includeSysTables False to match only against userspace tables * @return a GetTableDescriptorsRequest + * @deprecated Use {@link #buildGetTableDescriptorsRequest(Optional, boolean)} instead. */ + @Deprecated public static GetTableDescriptorsRequest buildGetTableDescriptorsRequest(final Pattern pattern, boolean includeSysTables) { + return buildGetTableDescriptorsRequest(Optional.ofNullable(pattern), includeSysTables); + } + + /** + * Creates a protocol buffer GetTableDescriptorsRequest + * + * @param pattern The compiled regular expression to match against + * @param includeSysTables False to match only against userspace tables + * @return a GetTableDescriptorsRequest + */ + public static GetTableDescriptorsRequest + buildGetTableDescriptorsRequest(Optional<Pattern> pattern, boolean includeSysTables) { GetTableDescriptorsRequest.Builder builder = GetTableDescriptorsRequest.newBuilder(); - if (pattern != null) builder.setRegex(pattern.toString()); + pattern.ifPresent(p -> builder.setRegex(p.toString())); builder.setIncludeSysTables(includeSysTables); return builder.build(); } @@ -1325,11 +1369,25 @@ public final class RequestConverter { * @param pattern The compiled regular expression to match against * @param includeSysTables False to match only against userspace tables * @return a GetTableNamesRequest + * @deprecated Use {@link #buildGetTableNamesRequest(Optional, boolean)} instead. */ + @Deprecated public static GetTableNamesRequest buildGetTableNamesRequest(final Pattern pattern, boolean includeSysTables) { + return buildGetTableNamesRequest(Optional.ofNullable(pattern), includeSysTables); + } + + /** + * Creates a protocol buffer GetTableNamesRequest + * + * @param pattern The compiled regular expression to match against + * @param includeSysTables False to match only against userspace tables + * @return a GetTableNamesRequest + */ + public static GetTableNamesRequest buildGetTableNamesRequest(Optional<Pattern> pattern, + boolean includeSysTables) { GetTableNamesRequest.Builder builder = GetTableNamesRequest.newBuilder(); - if (pattern != null) builder.setRegex(pattern.toString()); + pattern.ifPresent(p -> builder.setRegex(p.toString())); builder.setIncludeSysTables(includeSysTables); return builder.build(); } @@ -1635,11 +1693,18 @@ public final class RequestConverter { return builder.build(); } + /** + * @deprecated Use {@link #buildListReplicationPeersRequest(Optional)} instead. + */ + @Deprecated public static ListReplicationPeersRequest buildListReplicationPeersRequest(Pattern pattern) { + return buildListReplicationPeersRequest(Optional.ofNullable(pattern)); + } + + public static ListReplicationPeersRequest + buildListReplicationPeersRequest(Optional<Pattern> pattern) { ListReplicationPeersRequest.Builder builder = ListReplicationPeersRequest.newBuilder(); - if (pattern != null) { - builder.setRegex(pattern.toString()); - } + pattern.ifPresent(p -> builder.setRegex(p.toString())); return builder.build(); }