http://git-wip-us.apache.org/repos/asf/hbase/blob/28993833/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index c972b4c..8505241 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -40,10 +40,13 @@ import com.google.common.annotations.VisibleForTesting;
 
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+
 import java.util.stream.Stream;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.directory.api.util.OptionalComponentsMonitor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MetaTableAccessor;
@@ -190,7 +193,6 @@ import org.apache.hadoop.hbase.util.Pair;
  * The implementation of AsyncAdmin.
  */
 @InterfaceAudience.Private
-@InterfaceStability.Evolving
 public class AsyncHBaseAdmin implements AsyncAdmin {
   public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = 
"flush-table-proc";
 
@@ -278,7 +280,6 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
     return future;
   }
 
-  //TODO abstract call and adminCall into a single method.
   private <PREQ, PRESP, RESP> CompletableFuture<RESP> 
adminCall(HBaseRpcController controller,
       AdminService.Interface stub, PREQ preq, AdminRpcCall<PRESP, PREQ> 
rpcCall,
       Converter<RESP, PRESP> respConverter) {
@@ -318,25 +319,26 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
     CompletableFuture<Void> operate(TableName table);
   }
 
-  private CompletableFuture<TableDescriptor[]> batchTableOperations(Pattern 
pattern,
+  private CompletableFuture<List<TableDescriptor>> 
batchTableOperations(Pattern pattern,
       TableOperator operator, String operationType) {
-    CompletableFuture<TableDescriptor[]> future = new CompletableFuture<>();
+    CompletableFuture<List<TableDescriptor>> future = new 
CompletableFuture<>();
     List<TableDescriptor> failed = new LinkedList<>();
-    listTables(pattern, false).whenComplete(
+    listTables(Optional.ofNullable(pattern), false).whenComplete(
       (tables, error) -> {
         if (error != null) {
           future.completeExceptionally(error);
           return;
         }
-        CompletableFuture[] futures = Arrays.stream(tables)
-            .map((table) -> 
operator.operate(table.getTableName()).whenComplete((v, ex) -> {
-              if (ex != null) {
-                LOG.info("Failed to " + operationType + " table " + 
table.getTableName(), ex);
-                failed.add(table);
-              }
-            })).<CompletableFuture> toArray(size -> new 
CompletableFuture[size]);
+        CompletableFuture[] futures =
+            tables.stream()
+                .map((table) -> 
operator.operate(table.getTableName()).whenComplete((v, ex) -> {
+                  if (ex != null) {
+                    LOG.info("Failed to " + operationType + " table " + 
table.getTableName(), ex);
+                    failed.add(table);
+                  }
+                })).<CompletableFuture> toArray(size -> new 
CompletableFuture[size]);
         CompletableFuture.allOf(futures).thenAccept((v) -> {
-          future.complete(failed.toArray(new TableDescriptor[failed.size()]));
+          future.complete(failed);
         });
       });
     return future;
@@ -353,47 +355,28 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<TableDescriptor[]> listTables() {
-    return listTables((Pattern) null, false);
-  }
-
-  @Override
-  public CompletableFuture<TableDescriptor[]> listTables(String regex, boolean 
includeSysTables) {
-    return listTables(Pattern.compile(regex), false);
-  }
-
-  @Override
-  public CompletableFuture<TableDescriptor[]> listTables(Pattern pattern, 
boolean includeSysTables) {
-    return this
-        .<TableDescriptor[]>newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 
TableDescriptor[]> call(
-                controller, stub, 
RequestConverter.buildGetTableDescriptorsRequest(pattern,
-                  includeSysTables), (s, c, req, done) -> 
s.getTableDescriptors(c, req, done), (
-                    resp) -> 
ProtobufUtil.getTableDescriptorArray(resp))).call();
-  }
-
-  @Override
-  public CompletableFuture<TableName[]> listTableNames() {
-    return listTableNames((Pattern) null, false);
-  }
-
-  @Override
-  public CompletableFuture<TableName[]> listTableNames(String regex, boolean 
includeSysTables) {
-    return listTableNames(Pattern.compile(regex), false);
+  public CompletableFuture<List<TableDescriptor>> listTables(Optional<Pattern> 
pattern,
+      boolean includeSysTables) {
+    return this.<List<TableDescriptor>> newMasterCaller()
+        .action((controller, stub) -> this
+            .<GetTableDescriptorsRequest, GetTableDescriptorsResponse, 
List<TableDescriptor>> call(
+              controller, stub,
+              RequestConverter.buildGetTableDescriptorsRequest(pattern, 
includeSysTables),
+              (s, c, req, done) -> s.getTableDescriptors(c, req, done),
+              (resp) -> ProtobufUtil.toTableDescriptorList(resp)))
+        .call();
   }
 
   @Override
-  public CompletableFuture<TableName[]> listTableNames(Pattern pattern, 
boolean includeSysTables) {
-    return this
-        .<TableName[]>newMasterCaller()
-        .action(
-          (controller, stub) -> this
-              .<GetTableNamesRequest, GetTableNamesResponse, TableName[]> 
call(controller, stub,
-                RequestConverter.buildGetTableNamesRequest(pattern, 
includeSysTables), (s, c, req,
-                    done) -> s.getTableNames(c, req, done), (resp) -> 
ProtobufUtil
-                    .getTableNameArray(resp.getTableNamesList()))).call();
+  public CompletableFuture<List<TableName>> listTableNames(Optional<Pattern> 
pattern,
+      boolean includeSysTables) {
+    return this.<List<TableName>> newMasterCaller()
+        .action((controller, stub) -> this
+            .<GetTableNamesRequest, GetTableNamesResponse, List<TableName>> 
call(controller, stub,
+              RequestConverter.buildGetTableNamesRequest(pattern, 
includeSysTables),
+              (s, c, req, done) -> s.getTableNames(c, req, done),
+              (resp) -> 
ProtobufUtil.toTableNameList(resp.getTableNamesList())))
+        .call();
   }
 
   @Override
@@ -472,12 +455,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<TableDescriptor[]> deleteTables(String regex) {
-    return deleteTables(Pattern.compile(regex));
-  }
-
-  @Override
-  public CompletableFuture<TableDescriptor[]> deleteTables(Pattern pattern) {
+  public CompletableFuture<List<TableDescriptor>> deleteTables(Pattern 
pattern) {
     return batchTableOperations(pattern, (table) -> deleteTable(table), 
"DELETE");
   }
 
@@ -498,12 +476,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<TableDescriptor[]> enableTables(String regex) {
-    return enableTables(Pattern.compile(regex));
-  }
-
-  @Override
-  public CompletableFuture<TableDescriptor[]> enableTables(Pattern pattern) {
+  public CompletableFuture<List<TableDescriptor>> enableTables(Pattern 
pattern) {
     return batchTableOperations(pattern, (table) -> enableTable(table), 
"ENABLE");
   }
 
@@ -516,16 +489,10 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<TableDescriptor[]> disableTables(String regex) {
-    return disableTables(Pattern.compile(regex));
-  }
-
-  @Override
-  public CompletableFuture<TableDescriptor[]> disableTables(Pattern pattern) {
+  public CompletableFuture<List<TableDescriptor>> disableTables(Pattern 
pattern) {
     return batchTableOperations(pattern, (table) -> disableTable(table), 
"DISABLE");
   }
 
-
   @Override
   public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
     CompletableFuture<Boolean> future = new CompletableFuture<>();
@@ -577,7 +544,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
         if (!enabled) {
           future.complete(false);
         } else {
-          AsyncMetaTableAccessor.getTableRegionsAndLocations(metaTable, 
Optional.of(tableName))
+          AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, 
Optional.of(tableName))
               .whenComplete(
                 (locations, error1) -> {
                   if (error1 != null) {
@@ -586,12 +553,12 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
                   }
                   int notDeployed = 0;
                   int regionCount = 0;
-                  for (Pair<HRegionInfo, ServerName> pair : locations) {
-                    HRegionInfo info = pair.getFirst();
-                    if (pair.getSecond() == null) {
+                  for (HRegionLocation location : locations) {
+                    HRegionInfo info = location.getRegionInfo();
+                    if (location.getServerName() == null) {
                       if (LOG.isDebugEnabled()) {
                         LOG.debug("Table " + tableName + " has not deployed 
region "
-                            + pair.getFirst().getEncodedName());
+                            + info.getEncodedName());
                       }
                       notDeployed++;
                     } else if (splitKeys != null
@@ -706,21 +673,21 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<NamespaceDescriptor[]> listNamespaceDescriptors() {
+  public CompletableFuture<List<NamespaceDescriptor>> 
listNamespaceDescriptors() {
     return this
-        .<NamespaceDescriptor[]> newMasterCaller()
+        .<List<NamespaceDescriptor>> newMasterCaller()
         .action(
           (controller, stub) -> this
-              .<ListNamespaceDescriptorsRequest, 
ListNamespaceDescriptorsResponse, NamespaceDescriptor[]> call(
+              .<ListNamespaceDescriptorsRequest, 
ListNamespaceDescriptorsResponse, List<NamespaceDescriptor>> call(
                 controller, stub, 
ListNamespaceDescriptorsRequest.newBuilder().build(), (s, c, req,
                     done) -> s.listNamespaceDescriptors(c, req, done), (resp) 
-> ProtobufUtil
-                    .getNamespaceDescriptorArray(resp))).call();
+                    .toNamespaceDescriptorList(resp))).call();
   }
 
   @Override
-  public CompletableFuture<Boolean> setBalancerRunning(final boolean on) {
+  public CompletableFuture<Boolean> setBalancerOn(final boolean on) {
     return this
-        .<Boolean>newMasterCaller()
+        .<Boolean> newMasterCaller()
         .action(
           (controller, stub) -> this
               .<SetBalancerRunningRequest, SetBalancerRunningResponse, 
Boolean> call(controller,
@@ -730,24 +697,19 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<Boolean> balancer() {
-    return balancer(false);
-  }
-
-  @Override
-  public CompletableFuture<Boolean> balancer(boolean force) {
+  public CompletableFuture<Boolean> balance(boolean forcible) {
     return this
-        .<Boolean>newMasterCaller()
+        .<Boolean> newMasterCaller()
         .action(
           (controller, stub) -> this.<BalanceRequest, BalanceResponse, 
Boolean> call(controller,
-            stub, RequestConverter.buildBalanceRequest(force),
+            stub, RequestConverter.buildBalanceRequest(forcible),
             (s, c, req, done) -> s.balance(c, req, done), (resp) -> 
resp.getBalancerRan())).call();
   }
 
   @Override
-  public CompletableFuture<Boolean> isBalancerEnabled() {
+  public CompletableFuture<Boolean> isBalancerOn() {
     return this
-        .<Boolean>newMasterCaller()
+        .<Boolean> newMasterCaller()
         .action(
           (controller, stub) -> this.<IsBalancerEnabledRequest, 
IsBalancerEnabledResponse, Boolean> call(
             controller, stub, RequestConverter.buildIsBalancerEnabledRequest(),
@@ -756,109 +718,38 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<Void> closeRegion(String regionname, String 
serverName) {
-    return closeRegion(Bytes.toBytes(regionname), serverName);
-  }
-
-  @Override
-  public CompletableFuture<Void> closeRegion(byte[] regionName, String 
serverName) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegion(regionName).whenComplete((p, err) -> {
+  public CompletableFuture<Boolean> closeRegion(byte[] regionName, 
Optional<ServerName> serverName) {
+    CompletableFuture<Boolean> future = new CompletableFuture<>();
+    getRegionLocation(regionName).whenComplete((location, err) -> {
       if (err != null) {
         future.completeExceptionally(err);
         return;
       }
-      if (p == null || p.getFirst() == null) {
-        future.completeExceptionally(new 
UnknownRegionException(Bytes.toStringBinary(regionName)));
-        return;
-      }
-      if (serverName != null) {
-        closeRegion(ServerName.valueOf(serverName), 
p.getFirst()).whenComplete((p2, err2) -> {
+      ServerName server = serverName.isPresent() ? serverName.get() : 
location.getServerName();
+      if (server == null) {
+        future.completeExceptionally(new 
NotServingRegionException(regionName));
+      } else {
+        closeRegion(location.getRegionInfo(), server).whenComplete((result, 
err2) -> {
           if (err2 != null) {
             future.completeExceptionally(err2);
-          }else{
-            future.complete(null);
+          } else {
+            future.complete(result);
           }
         });
-      } else {
-        if (p.getSecond() == null) {
-          future.completeExceptionally(new 
NotServingRegionException(regionName));
-        } else {
-          closeRegion(p.getSecond(), p.getFirst()).whenComplete((p2, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-            }else{
-              future.complete(null);
-            }
-          });
-        }
       }
     });
     return future;
   }
 
-  CompletableFuture<Pair<HRegionInfo, ServerName>> getRegion(byte[] 
regionName) {
-    if (regionName == null) {
-      return failedFuture(new IllegalArgumentException("Pass region name"));
-    }
-    CompletableFuture<Pair<HRegionInfo, ServerName>> future = new 
CompletableFuture<>();
-    AsyncMetaTableAccessor.getRegion(metaTable, regionName).whenComplete(
-      (p, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-        } else if (p != null) {
-          future.complete(p);
-        } else {
-          metaTable.scanAll(
-            new 
Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY))
-              .whenComplete((results, err2) -> {
-                if (err2 != null) {
-                  future.completeExceptionally(err2);
-                  return;
-                }
-                String encodedName = Bytes.toString(regionName);
-                if (results != null && !results.isEmpty()) {
-                  for (Result r : results) {
-                    if (r.isEmpty() || MetaTableAccessor.getHRegionInfo(r) == 
null) continue;
-                    RegionLocations rl = 
MetaTableAccessor.getRegionLocations(r);
-                    if (rl != null) {
-                      for (HRegionLocation h : rl.getRegionLocations()) {
-                        if (h != null && 
encodedName.equals(h.getRegionInfo().getEncodedName())) {
-                          future.complete(new Pair<>(h.getRegionInfo(), 
h.getServerName()));
-                          return;
-                        }
-                      }
-                    }
-                  }
-                }
-                future.complete(null);
-              });
-        }
-      });
-    return future;
-  }
-
-  @Override
-  public CompletableFuture<Boolean> closeRegionWithEncodedRegionName(String 
encodedRegionName,
-      String serverName) {
+  private CompletableFuture<Boolean> closeRegion(HRegionInfo hri, ServerName 
serverName) {
     return this
         .<Boolean> newAdminCaller()
         .action(
           (controller, stub) -> this.<CloseRegionRequest, CloseRegionResponse, 
Boolean> adminCall(
             controller, stub,
-            
ProtobufUtil.buildCloseRegionRequest(ServerName.valueOf(serverName), 
encodedRegionName),
-            (s, c, req, done) -> s.closeRegion(controller, req, done), (resp) 
-> resp.getClosed()))
-        .serverName(ServerName.valueOf(serverName)).call();
-  }
-
-  @Override
-  public CompletableFuture<Void> closeRegion(ServerName sn, HRegionInfo hri) {
-    return this.<Void> newAdminCaller()
-        .action(
-          (controller, stub) -> this.<CloseRegionRequest, CloseRegionResponse, 
Void> adminCall(
-            controller, stub, ProtobufUtil.buildCloseRegionRequest(sn, 
hri.getRegionName()),
-            (s, c, req, done) -> s.closeRegion(controller, req, done), resp -> 
null))
-        .serverName(sn).call();
+            ProtobufUtil.buildCloseRegionRequest(serverName, 
hri.getRegionName()),
+            (s, c, req, done) -> s.closeRegion(controller, req, done), resp -> 
resp.getClosed()))
+        .serverName(serverName).call();
   }
 
   @Override
@@ -905,75 +796,54 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   @Override
   public CompletableFuture<Void> flushRegion(byte[] regionName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegion(regionName).whenComplete((p, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-        return;
-      }
-      if (p == null || p.getFirst() == null) {
-        future.completeExceptionally(
-          new IllegalArgumentException("Invalid region: " + 
Bytes.toStringBinary(regionName)));
-        return;
-      }
-      if (p.getSecond() == null) {
-        future.completeExceptionally(
-          new NoServerForRegionException(Bytes.toStringBinary(regionName)));
-        return;
-      }
+    getRegionLocation(regionName).whenComplete(
+      (location, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        ServerName serverName = location.getServerName();
+        if (serverName == null) {
+          future.completeExceptionally(new NoServerForRegionException(Bytes
+              .toStringBinary(regionName)));
+          return;
+        }
 
-      this.<Void> newAdminCaller().serverName(p.getSecond())
-          .action((controller, stub) -> this
-              .<FlushRegionRequest, FlushRegionResponse, Void> 
adminCall(controller, stub,
-                
RequestConverter.buildFlushRegionRequest(p.getFirst().getRegionName()),
-                (s, c, req, done) -> s.flushRegion(c, req, done), resp -> 
null))
-          .call().whenComplete((ret, err2) -> {
-            if (err2 != null) {
-              future.completeExceptionally(err2);
-            } else {
-              future.complete(ret);
-            }
-          });
-    });
+        HRegionInfo regionInfo = location.getRegionInfo();
+        this.<Void> newAdminCaller()
+            .serverName(serverName)
+            .action(
+              (controller, stub) -> this.<FlushRegionRequest, 
FlushRegionResponse, Void> adminCall(
+                controller, stub, 
RequestConverter.buildFlushRegionRequest(regionInfo
+                    .getRegionName()), (s, c, req, done) -> s.flushRegion(c, 
req, done),
+                resp -> null)).call().whenComplete((ret, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                future.complete(ret);
+              }
+            });
+      });
     return future;
   }
 
   @Override
-  public CompletableFuture<Void> compact(TableName tableName) {
-    return compact(tableName, null, false, CompactType.NORMAL);
-  }
-
-  @Override
-  public CompletableFuture<Void> compact(TableName tableName, byte[] 
columnFamily) {
+  public CompletableFuture<Void> compact(TableName tableName, Optional<byte[]> 
columnFamily) {
     return compact(tableName, columnFamily, false, CompactType.NORMAL);
   }
 
   @Override
-  public CompletableFuture<Void> compactRegion(byte[] regionName) {
-    return compactRegion(regionName, null, false);
-  }
-
-  @Override
-  public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] 
columnFamily) {
+  public CompletableFuture<Void> compactRegion(byte[] regionName, 
Optional<byte[]> columnFamily) {
     return compactRegion(regionName, columnFamily, false);
   }
 
   @Override
-  public CompletableFuture<Void> majorCompact(TableName tableName) {
-    return compact(tableName, null, true, CompactType.NORMAL);
-  }
-
-  @Override
-  public CompletableFuture<Void> majorCompact(TableName tableName, byte[] 
columnFamily) {
+  public CompletableFuture<Void> majorCompact(TableName tableName, 
Optional<byte[]> columnFamily) {
     return compact(tableName, columnFamily, true, CompactType.NORMAL);
   }
 
   @Override
-  public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
-    return compactRegion(regionName, null, true);
-  }
-
-  @Override
-  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] 
columnFamily) {
+  public CompletableFuture<Void> majorCompactRegion(byte[] regionName, 
Optional<byte[]> columnFamily) {
     return compactRegion(regionName, columnFamily, true);
   }
 
@@ -996,7 +866,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
       }
       List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
       if (hRegionInfos != null) {
-        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, 
major, null)));
+        hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, 
major, Optional.empty())));
       }
       CompletableFuture
           .allOf(compactFutures.toArray(new 
CompletableFuture<?>[compactFutures.size()]))
@@ -1011,33 +881,30 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
     return future;
   }
 
-  private CompletableFuture<Void> compactRegion(final byte[] regionName, final 
byte[] columnFamily,
-      final boolean major) {
+  private CompletableFuture<Void> compactRegion(byte[] regionName, 
Optional<byte[]> columnFamily,
+      boolean major) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegion(regionName).whenComplete((p, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-        return;
-      }
-      if (p == null || p.getFirst() == null) {
-        future.completeExceptionally(
-          new IllegalArgumentException("Invalid region: " + 
Bytes.toStringBinary(regionName)));
-        return;
-      }
-      if (p.getSecond() == null) {
-        // found a region without region server assigned.
-        future.completeExceptionally(
-          new NoServerForRegionException(Bytes.toStringBinary(regionName)));
-        return;
-      }
-      compact(p.getSecond(), p.getFirst(), major, 
columnFamily).whenComplete((ret, err2) -> {
-        if (err2 != null) {
-          future.completeExceptionally(err2);
-        } else {
-          future.complete(ret);
+    getRegionLocation(regionName).whenComplete(
+      (location, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        ServerName serverName = location.getServerName();
+        if (serverName == null) {
+          future.completeExceptionally(new NoServerForRegionException(Bytes
+              .toStringBinary(regionName)));
+          return;
         }
+        compact(location.getServerName(), location.getRegionInfo(), major, 
columnFamily)
+            .whenComplete((ret, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else {
+                future.complete(ret);
+              }
+            });
       });
-    });
     return future;
   }
 
@@ -1045,45 +912,34 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
    * List all region locations for the specific table.
    */
   private CompletableFuture<List<HRegionLocation>> 
getTableHRegionLocations(TableName tableName) {
-    CompletableFuture<List<HRegionLocation>> future = new 
CompletableFuture<>();
     if (TableName.META_TABLE_NAME.equals(tableName)) {
+      CompletableFuture<List<HRegionLocation>> future = new 
CompletableFuture<>();
       // For meta table, we use zk to fetch all locations.
       AsyncRegistry registry = 
AsyncRegistryFactory.getRegistry(connection.getConfiguration());
-      registry.getMetaRegionLocation().whenComplete((metaRegions, err) -> {
-        if (err != null) {
-          future.completeExceptionally(err);
-        } else if (metaRegions == null || metaRegions.isEmpty()
-            || metaRegions.getDefaultRegionLocation() == null) {
-          future.completeExceptionally(new IOException("meta region does not 
found"));
-        } else {
-          
future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
-        }
-        // close the registry.
-        IOUtils.closeQuietly(registry);
-      });
+      registry.getMetaRegionLocation().whenComplete(
+        (metaRegions, err) -> {
+          if (err != null) {
+            future.completeExceptionally(err);
+          } else if (metaRegions == null || metaRegions.isEmpty()
+              || metaRegions.getDefaultRegionLocation() == null) {
+            future.completeExceptionally(new IOException("meta region does not 
found"));
+          } else {
+            
future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
+          }
+          // close the registry.
+          IOUtils.closeQuietly(registry);
+        });
+      return future;
     } else {
       // For non-meta table, we fetch all locations by scanning hbase:meta 
table
-      AsyncMetaTableAccessor.getTableRegionsAndLocations(metaTable, 
Optional.of(tableName))
-          .whenComplete((locations, err) -> {
-            if (err != null) {
-              future.completeExceptionally(err);
-            } else if (locations == null || locations.isEmpty()) {
-              future.complete(Collections.emptyList());
-            } else {
-              List<HRegionLocation> regionLocations = locations.stream()
-                  .map(loc -> new HRegionLocation(loc.getFirst(), 
loc.getSecond()))
-                  .collect(Collectors.toList());
-              future.complete(regionLocations);
-            }
-          });
+      return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, 
Optional.of(tableName));
     }
-    return future;
   }
 
   /**
    * Compact column family of a table, Asynchronous operation even if 
CompletableFuture.get()
    */
-  private CompletableFuture<Void> compact(final TableName tableName, final 
byte[] columnFamily,
+  private CompletableFuture<Void> compact(final TableName tableName, 
Optional<byte[]> columnFamily,
       final boolean major, CompactType compactType) {
     if (CompactType.MOB.equals(compactType)) {
       // TODO support MOB compact.
@@ -1120,13 +976,15 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
    * Compact the region at specific region server.
    */
   private CompletableFuture<Void> compact(final ServerName sn, final 
HRegionInfo hri,
-      final boolean major, final byte[] family) {
-    return this.<Void> newAdminCaller().serverName(sn)
-        .action((controller, stub) -> this
-            .<CompactRegionRequest, CompactRegionResponse, Void> 
adminCall(controller, stub,
-              RequestConverter.buildCompactRegionRequest(hri.getRegionName(), 
major, family),
-              (s, c, req, done) -> s.compactRegion(c, req, done), resp -> 
null))
-        .call();
+      final boolean major, Optional<byte[]> columnFamily) {
+    return this
+        .<Void> newAdminCaller()
+        .serverName(sn)
+        .action(
+          (controller, stub) -> this.<CompactRegionRequest, 
CompactRegionResponse, Void> adminCall(
+            controller, stub, 
RequestConverter.buildCompactRegionRequest(hri.getRegionName(),
+              major, columnFamily), (s, c, req, done) -> s.compactRegion(c, 
req, done),
+            resp -> null)).call();
   }
 
   private byte[] toEncodeRegionName(byte[] regionName) {
@@ -1140,32 +998,29 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
 
   private void checkAndGetTableName(byte[] encodeRegionName, 
AtomicReference<TableName> tableName,
       CompletableFuture<TableName> result) {
-    getRegion(encodeRegionName).whenComplete((p, err) -> {
-      if (err != null) {
-        result.completeExceptionally(err);
-        return;
-      }
-      if (p == null) {
-        result.completeExceptionally(new UnknownRegionException(
-            "Can't invoke merge on unknown region " + 
Bytes.toStringBinary(encodeRegionName)));
-        return;
-      }
-      if (p.getFirst().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
-        result.completeExceptionally(
-          new IllegalArgumentException("Can't invoke merge on non-default 
regions directly"));
-        return;
-      }
-      if (!tableName.compareAndSet(null, p.getFirst().getTable())) {
-        if (!tableName.get().equals(p.getFirst().getTable())) {
-          // tables of this two region should be same.
-          result.completeExceptionally(
-            new IllegalArgumentException("Cannot merge regions from two 
different tables "
-                + tableName.get() + " and " + p.getFirst().getTable()));
-        } else {
-          result.complete(tableName.get());
+    getRegionLocation(encodeRegionName).whenComplete(
+      (location, err) -> {
+        if (err != null) {
+          result.completeExceptionally(err);
+          return;
         }
-      }
-    });
+        HRegionInfo regionInfo = location.getRegionInfo();
+        if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+          result.completeExceptionally(new IllegalArgumentException(
+              "Can't invoke merge on non-default regions directly"));
+          return;
+        }
+        if (!tableName.compareAndSet(null, regionInfo.getTable())) {
+          if (!tableName.get().equals(regionInfo.getTable())) {
+            // tables of this two region should be same.
+            result.completeExceptionally(new IllegalArgumentException(
+                "Cannot merge regions from two different tables " + 
tableName.get() + " and "
+                    + regionInfo.getTable()));
+          } else {
+            result.complete(tableName.get());
+          }
+        }
+      });
   }
 
   private CompletableFuture<TableName> checkRegionsAndGetTableName(byte[] 
encodeRegionNameA,
@@ -1249,7 +1104,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
                       if (hri == null || hri.isSplitParent()
                           || hri.getReplicaId() != 
HRegionInfo.DEFAULT_REPLICA_ID)
                         continue;
-                      splitFutures.add(split(h.getServerName(), hri, null));
+                      splitFutures.add(split(h.getServerName(), hri, 
Optional.empty()));
                     }
                   }
                 }
@@ -1272,11 +1127,6 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<Void> splitRegion(byte[] regionName) {
-    return splitRegion(regionName, null);
-  }
-
-  @Override
   public CompletableFuture<Void> split(TableName tableName, byte[] splitPoint) 
{
     CompletableFuture<Void> result = new CompletableFuture<>();
     if (splitPoint == null) {
@@ -1290,7 +1140,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
             result.completeExceptionally(new IllegalArgumentException(
                 "Region does not found: rowKey=" + 
Bytes.toStringBinary(splitPoint)));
           } else {
-            splitRegion(loc.getRegionInfo().getRegionName(), splitPoint)
+            splitRegion(loc.getRegionInfo().getRegionName(), 
Optional.of(splitPoint))
                 .whenComplete((ret, err2) -> {
                   if (err2 != null) {
                     result.completeExceptionally(err2);
@@ -1305,182 +1155,149 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] 
splitPoint) {
+  public CompletableFuture<Void> splitRegion(byte[] regionName, 
Optional<byte[]> splitPoint) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegion(regionName).whenComplete((p, err) -> {
-      if (p == null) {
-        future.completeExceptionally(
-          new IllegalArgumentException("Invalid region: " + 
Bytes.toStringBinary(regionName)));
-        return;
-      }
-      if (p.getFirst() != null && p.getFirst().getReplicaId() != 
HRegionInfo.DEFAULT_REPLICA_ID) {
-        future.completeExceptionally(new IllegalArgumentException("Can't split 
replicas directly. "
-            + "Replicas are auto-split when their primary is split."));
-        return;
-      }
-      if (p.getSecond() == null) {
-        future.completeExceptionally(
-          new NoServerForRegionException(Bytes.toStringBinary(regionName)));
-        return;
-      }
-      split(p.getSecond(), p.getFirst(), splitPoint).whenComplete((ret, err2) 
-> {
-        if (err2 != null) {
-          future.completeExceptionally(err2);
-        } else {
-          future.complete(ret);
+    getRegionLocation(regionName).whenComplete(
+      (location, err) -> {
+        HRegionInfo regionInfo = location.getRegionInfo();
+        if (regionInfo.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
+          future.completeExceptionally(new IllegalArgumentException(
+              "Can't split replicas directly. "
+                  + "Replicas are auto-split when their primary is split."));
+          return;
         }
+        ServerName serverName = location.getServerName();
+        if (serverName == null) {
+          future.completeExceptionally(new NoServerForRegionException(Bytes
+              .toStringBinary(regionName)));
+          return;
+        }
+        split(serverName, regionInfo, splitPoint).whenComplete((ret, err2) -> {
+          if (err2 != null) {
+            future.completeExceptionally(err2);
+          } else {
+            future.complete(ret);
+          }
+        });
       });
-    });
     return future;
   }
 
-  @VisibleForTesting
-  public CompletableFuture<Void> split(final ServerName sn, final HRegionInfo 
hri,
-      byte[] splitPoint) {
-    if (hri.getStartKey() != null && splitPoint != null
-        && Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) {
-      return failedFuture(
-        new IllegalArgumentException("should not give a splitkey which equals 
to startkey!"));
+  private CompletableFuture<Void> split(final ServerName sn, final HRegionInfo 
hri,
+      Optional<byte[]> splitPoint) {
+    if (hri.getStartKey() != null && splitPoint.isPresent()
+        && Bytes.compareTo(hri.getStartKey(), splitPoint.get()) == 0) {
+      return failedFuture(new IllegalArgumentException(
+          "should not give a splitkey which equals to startkey!"));
     }
-    return this.<Void> newAdminCaller()
+    return this
+        .<Void> newAdminCaller()
         .action(
           (controller, stub) -> this.<SplitRegionRequest, SplitRegionResponse, 
Void> adminCall(
-            controller, stub, 
ProtobufUtil.buildSplitRegionRequest(hri.getRegionName(), splitPoint),
+            controller, stub,
+            ProtobufUtil.buildSplitRegionRequest(hri.getRegionName(), 
splitPoint),
             (s, c, req, done) -> s.splitRegion(controller, req, done), resp -> 
null))
         .serverName(sn).call();
   }
 
-  /**
-   * Turn regionNameOrEncodedRegionName into regionName, if region does not 
found, then it'll throw
-   * an IllegalArgumentException wrapped by a {@link CompletableFuture}
-   * @param regionNameOrEncodedRegionName
-   * @return
-   */
-  CompletableFuture<byte[]> getRegionName(byte[] 
regionNameOrEncodedRegionName) {
-    CompletableFuture<byte[]> future = new CompletableFuture<>();
-    if (Bytes
-        .equals(regionNameOrEncodedRegionName, 
HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
-        || Bytes.equals(regionNameOrEncodedRegionName,
-          HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
-      future.complete(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
-      return future;
-    }
-
-    getRegion(regionNameOrEncodedRegionName).whenComplete((p, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-      }
-      if (p != null && p.getFirst() != null) {
-        future.complete(p.getFirst().getRegionName());
-      } else {
-        future.completeExceptionally(
-          new IllegalArgumentException("Invalid region name or encoded region 
name: "
-              + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
-      }
-    });
-    return future;
-  }
-
   @Override
   public CompletableFuture<Void> assign(byte[] regionName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionName(regionName).whenComplete((fullRegionName, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-      } else {
+    getRegionInfo(regionName).whenComplete(
+      (regionInfo, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
         this.<Void> newMasterCaller()
             .action(
               ((controller, stub) -> this.<AssignRegionRequest, 
AssignRegionResponse, Void> call(
-                controller, stub, 
RequestConverter.buildAssignRegionRequest(fullRegionName),
-                (s, c, req, done) -> s.assignRegion(c, req, done), resp -> 
null)))
-            .call().whenComplete((ret, err2) -> {
+                controller, stub, 
RequestConverter.buildAssignRegionRequest(regionInfo
+                    .getRegionName()), (s, c, req, done) -> s.assignRegion(c, 
req, done),
+                resp -> null))).call().whenComplete((ret, err2) -> {
               if (err2 != null) {
                 future.completeExceptionally(err2);
               } else {
                 future.complete(ret);
               }
             });
-      }
-    });
+      });
     return future;
   }
 
   @Override
-  public CompletableFuture<Void> unassign(byte[] regionName, boolean force) {
+  public CompletableFuture<Void> unassign(byte[] regionName, boolean forcible) 
{
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionName(regionName).whenComplete((fullRegionName, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-      } else {
+    getRegionInfo(regionName).whenComplete(
+      (regionInfo, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
         this.<Void> newMasterCaller()
-            .action(((controller, stub) -> this
-                .<UnassignRegionRequest, UnassignRegionResponse, Void> 
call(controller, stub,
-                  RequestConverter.buildUnassignRegionRequest(fullRegionName, 
force),
-                  (s, c, req, done) -> s.unassignRegion(c, req, done), resp -> 
null)))
-            .call().whenComplete((ret, err2) -> {
+            .action(
+              ((controller, stub) -> this
+                  .<UnassignRegionRequest, UnassignRegionResponse, Void> 
call(controller, stub,
+                    
RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), 
forcible),
+                    (s, c, req, done) -> s.unassignRegion(c, req, done), resp 
-> null))).call()
+            .whenComplete((ret, err2) -> {
               if (err2 != null) {
                 future.completeExceptionally(err2);
               } else {
                 future.complete(ret);
               }
             });
-      }
-    });
+      });
     return future;
   }
 
   @Override
   public CompletableFuture<Void> offline(byte[] regionName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionName(regionName).whenComplete((fullRegionName, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-      } else {
+    getRegionInfo(regionName).whenComplete(
+      (regionInfo, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
         this.<Void> newMasterCaller()
             .action(
               ((controller, stub) -> this.<OfflineRegionRequest, 
OfflineRegionResponse, Void> call(
-                controller, stub, 
RequestConverter.buildOfflineRegionRequest(fullRegionName),
-                (s, c, req, done) -> s.offlineRegion(c, req, done), resp -> 
null)))
-            .call().whenComplete((ret, err2) -> {
+                controller, stub, 
RequestConverter.buildOfflineRegionRequest(regionInfo
+                    .getRegionName()), (s, c, req, done) -> s.offlineRegion(c, 
req, done),
+                resp -> null))).call().whenComplete((ret, err2) -> {
               if (err2 != null) {
                 future.completeExceptionally(err2);
               } else {
                 future.complete(ret);
               }
             });
-      }
-    });
+      });
     return future;
   }
 
   @Override
-  public CompletableFuture<Void> move(byte[] regionName, byte[] 
destServerName) {
+  public CompletableFuture<Void> move(byte[] regionName, Optional<ServerName> 
destServerName) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    getRegionName(regionName).whenComplete((fullRegionName, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-      } else {
-        final MoveRegionRequest request;
-        try {
-          request = RequestConverter.buildMoveRegionRequest(
-            Bytes.toBytes(HRegionInfo.encodeRegionName(fullRegionName)), 
destServerName);
-        } catch (DeserializationException e) {
-          future.completeExceptionally(e);
+    getRegionInfo(regionName).whenComplete(
+      (regionInfo, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
           return;
         }
         this.<Void> newMasterCaller()
-            .action((controller, stub) -> this.<MoveRegionRequest, 
MoveRegionResponse, Void> call(
-              controller, stub, request, (s, c, req, done) -> s.moveRegion(c, 
req, done),
-              resp -> null))
-            .call().whenComplete((ret, err2) -> {
+            .action(
+              (controller, stub) -> this.<MoveRegionRequest, 
MoveRegionResponse, Void> call(
+                controller, stub, RequestConverter.buildMoveRegionRequest(
+                  regionInfo.getEncodedNameAsBytes(), destServerName), (s, c, 
req, done) -> s
+                    .moveRegion(c, req, done), resp -> 
null)).call().whenComplete((ret, err2) -> {
               if (err2 != null) {
                 future.completeExceptionally(err2);
               } else {
                 future.complete(ret);
               }
             });
-      }
-    });
+      });
     return future;
   }
 
@@ -1644,17 +1461,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<List<ReplicationPeerDescription>> 
listReplicationPeers() {
-    return listReplicationPeers((Pattern) null);
-  }
-
-  @Override
-  public CompletableFuture<List<ReplicationPeerDescription>> 
listReplicationPeers(String regex) {
-    return listReplicationPeers(Pattern.compile(regex));
-  }
-
-  @Override
-  public CompletableFuture<List<ReplicationPeerDescription>> 
listReplicationPeers(Pattern pattern) {
+  public CompletableFuture<List<ReplicationPeerDescription>> 
listReplicationPeers(Optional<Pattern> pattern) {
     return this
         .<List<ReplicationPeerDescription>> newMasterCaller()
         .action(
@@ -1676,18 +1483,17 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
       (tables, error) -> {
         if (!completeExceptionally(future, error)) {
           List<TableCFs> replicatedTableCFs = new ArrayList<>();
-          Arrays.asList(tables).forEach(
-            table -> {
-              Map<String, Integer> cfs = new HashMap<>();
-              Stream.of(table.getColumnFamilies())
-                  .filter(column -> column.getScope() != 
HConstants.REPLICATION_SCOPE_LOCAL)
-                  .forEach(column -> {
-                    cfs.put(column.getNameAsString(), column.getScope());
-                  });
-              if (!cfs.isEmpty()) {
-                replicatedTableCFs.add(new TableCFs(table.getTableName(), 
cfs));
-              }
-            });
+          tables.forEach(table -> {
+            Map<String, Integer> cfs = new HashMap<>();
+            Stream.of(table.getColumnFamilies())
+                .filter(column -> column.getScope() != 
HConstants.REPLICATION_SCOPE_LOCAL)
+                .forEach(column -> {
+                  cfs.put(column.getNameAsString(), column.getScope());
+                });
+            if (!cfs.isEmpty()) {
+              replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
+            }
+          });
           future.complete(replicatedTableCFs);
         }
       });
@@ -1707,8 +1513,8 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
-    SnapshotProtos.SnapshotDescription snapshot =
-        ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
+    SnapshotProtos.SnapshotDescription snapshot = ProtobufUtil
+        .createHBaseProtosSnapshotDesc(snapshotDesc);
     try {
       ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
     } catch (IllegalArgumentException e) {
@@ -1717,10 +1523,10 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
     CompletableFuture<Void> future = new CompletableFuture<>();
     final SnapshotRequest request = 
SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
     this.<Long> newMasterCaller()
-        .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, 
Long> call(
-          controller, stub, request, (s, c, req, done) -> s.snapshot(c, req, 
done),
-          resp -> resp.getExpectedTimeout()))
-        .call().whenComplete((expectedTimeout, err) -> {
+        .action(
+          (controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> 
call(controller,
+            stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
+            resp -> 
resp.getExpectedTimeout())).call().whenComplete((expectedTimeout, err) -> {
           if (err != null) {
             future.completeExceptionally(err);
             return;
@@ -1734,25 +1540,24 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
             @Override
             public void run(Timeout timeout) throws Exception {
               if (EnvironmentEdgeManager.currentTime() < endTime) {
-                isSnapshotFinished(snapshotDesc).whenComplete((done, err) -> {
-                  if (err != null) {
-                    future.completeExceptionally(err);
+                isSnapshotFinished(snapshotDesc).whenComplete((done, err2) -> {
+                  if (err2 != null) {
+                    future.completeExceptionally(err2);
                   } else if (done) {
                     future.complete(null);
                   } else {
                     // retry again after pauseTime.
-                    long pauseTime = ConnectionUtils
-                        .getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), 
++tries);
-                    pauseTime = Math.min(pauseTime, maxPauseTime);
-                    AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
-                      TimeUnit.MILLISECONDS);
-                  }
-                });
+                  long pauseTime = ConnectionUtils.getPauseTime(
+                    TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
+                  pauseTime = Math.min(pauseTime, maxPauseTime);
+                  AsyncConnectionImpl.RETRY_TIMER
+                      .newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
+                }
+              } );
               } else {
-                future.completeExceptionally(new SnapshotCreationException(
-                    "Snapshot '" + snapshot.getName() + "' wasn't completed in 
expectedTime:"
-                        + expectedTimeout + " ms",
-                    snapshotDesc));
+                future.completeExceptionally(new 
SnapshotCreationException("Snapshot '"
+                    + snapshot.getName() + "' wasn't completed in 
expectedTime:" + expectedTimeout
+                    + " ms", snapshotDesc));
               }
             }
           };
@@ -1763,13 +1568,15 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
 
   @Override
   public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription 
snapshot) {
-    return this.<Boolean> newMasterCaller()
-        .action((controller, stub) -> this
-            .<IsSnapshotDoneRequest, IsSnapshotDoneResponse, Boolean> 
call(controller, stub,
-              IsSnapshotDoneRequest.newBuilder()
-                  
.setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
-              (s, c, req, done) -> s.isSnapshotDone(c, req, done), resp -> 
resp.getDone()))
-        .call();
+    return this
+        .<Boolean> newMasterCaller()
+        .action(
+          (controller, stub) -> this.<IsSnapshotDoneRequest, 
IsSnapshotDoneResponse, Boolean> call(
+            controller,
+            stub,
+            IsSnapshotDoneRequest.newBuilder()
+                
.setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, 
c,
+                req, done) -> s.isSnapshotDone(c, req, done), resp -> 
resp.getDone())).call();
   }
 
   @Override
@@ -1780,109 +1587,110 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
     return restoreSnapshot(snapshotName, takeFailSafeSnapshot);
   }
 
-  private CompletableFuture<Void> restoreSnapshotWithFailSafe(String 
snapshotName,
-      TableName tableName, boolean takeFailSafeSnapshot) {
+  @Override
+  public CompletableFuture<Void> restoreSnapshot(String snapshotName, boolean 
takeFailSafeSnapshot) {
+    CompletableFuture<Void> future = new CompletableFuture<>();
+    listSnapshots(Pattern.compile(snapshotName)).whenComplete(
+      (snapshotDescriptions, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        TableName tableName = null;
+        if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
+          for (SnapshotDescription snap : snapshotDescriptions) {
+            if (snap.getName().equals(snapshotName)) {
+              tableName = snap.getTableName();
+              break;
+            }
+          }
+        }
+        if (tableName == null) {
+          future.completeExceptionally(new RestoreSnapshotException(
+              "Unable to find the table name for snapshot=" + snapshotName));
+          return;
+        }
+        final TableName finalTableName = tableName;
+        tableExists(finalTableName)
+            .whenComplete((exists, err2) -> {
+              if (err2 != null) {
+                future.completeExceptionally(err2);
+              } else if (!exists) {
+                // if table does not exist, then just clone snapshot into new 
table.
+              completeConditionalOnFuture(future,
+                internalRestoreSnapshot(snapshotName, finalTableName));
+            } else {
+              isTableDisabled(finalTableName).whenComplete(
+                (disabled, err4) -> {
+                  if (err4 != null) {
+                    future.completeExceptionally(err4);
+                  } else if (!disabled) {
+                    future.completeExceptionally(new 
TableNotDisabledException(finalTableName));
+                  } else {
+                    completeConditionalOnFuture(future,
+                      restoreSnapshot(snapshotName, finalTableName, 
takeFailSafeSnapshot));
+                  }
+                });
+            }
+          } );
+      });
+    return future;
+  }
+
+  private CompletableFuture<Void> restoreSnapshot(String snapshotName, 
TableName tableName,
+      boolean takeFailSafeSnapshot) {
     if (takeFailSafeSnapshot) {
       CompletableFuture<Void> future = new CompletableFuture<>();
       // Step.1 Take a snapshot of the current state
-      String failSafeSnapshotSnapshotNameFormat =
-          
this.connection.getConfiguration().get(HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
-            HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
-      final String failSafeSnapshotSnapshotName =
-          failSafeSnapshotSnapshotNameFormat.replace("{snapshot.name}", 
snapshotName)
-              .replace("{table.name}", 
tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
-              .replace("{restore.timestamp}", 
String.valueOf(EnvironmentEdgeManager.currentTime()));
+      String failSafeSnapshotSnapshotNameFormat = 
this.connection.getConfiguration().get(
+        HConstants.SNAPSHOT_RESTORE_FAILSAFE_NAME,
+        HConstants.DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME);
+      final String failSafeSnapshotSnapshotName = 
failSafeSnapshotSnapshotNameFormat
+          .replace("{snapshot.name}", snapshotName)
+          .replace("{table.name}", 
tableName.toString().replace(TableName.NAMESPACE_DELIM, '.'))
+          .replace("{restore.timestamp}", 
String.valueOf(EnvironmentEdgeManager.currentTime()));
       LOG.info("Taking restore-failsafe snapshot: " + 
failSafeSnapshotSnapshotName);
       snapshot(failSafeSnapshotSnapshotName, tableName).whenComplete((ret, 
err) -> {
         if (err != null) {
           future.completeExceptionally(err);
         } else {
           // Step.2 Restore snapshot
-          internalRestoreSnapshot(snapshotName, 
tableName).whenComplete((void2, err2) -> {
-            if (err2 != null) {
-              // Step.3.a Something went wrong during the restore and try to 
rollback.
-              internalRestoreSnapshot(failSafeSnapshotSnapshotName, tableName)
-                  .whenComplete((void3, err3) -> {
-                    if (err3 != null) {
-                      future.completeExceptionally(err3);
-                    } else {
-                      String msg =
-                          "Restore snapshot=" + snapshotName + " failed. 
Rollback to snapshot="
-                              + failSafeSnapshotSnapshotName + " succeeded.";
-                      future.completeExceptionally(new 
RestoreSnapshotException(msg));
-                    }
-                  });
-            } else {
-              // Step.3.b If the restore is succeeded, delete the pre-restore 
snapshot.
-              LOG.info("Deleting restore-failsafe snapshot: " + 
failSafeSnapshotSnapshotName);
-              deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete((ret3, 
err3) -> {
-                if (err3 != null) {
-                  LOG.error(
-                    "Unable to remove the failsafe snapshot: " + 
failSafeSnapshotSnapshotName,
-                    err3);
-                  future.completeExceptionally(err3);
-                } else {
-                  future.complete(ret3);
-                }
-              });
-            }
-          });
+        internalRestoreSnapshot(snapshotName, tableName).whenComplete((void2, 
err2) -> {
+          if (err2 != null) {
+            // Step.3.a Something went wrong during the restore and try to 
rollback.
+          internalRestoreSnapshot(failSafeSnapshotSnapshotName, 
tableName).whenComplete(
+            (void3, err3) -> {
+              if (err3 != null) {
+                future.completeExceptionally(err3);
+              } else {
+                String msg = "Restore snapshot=" + snapshotName + " failed. 
Rollback to snapshot="
+                    + failSafeSnapshotSnapshotName + " succeeded.";
+                future.completeExceptionally(new 
RestoreSnapshotException(msg));
+              }
+            });
+        } else {
+          // Step.3.b If the restore is succeeded, delete the pre-restore 
snapshot.
+          LOG.info("Deleting restore-failsafe snapshot: " + 
failSafeSnapshotSnapshotName);
+          deleteSnapshot(failSafeSnapshotSnapshotName).whenComplete(
+            (ret3, err3) -> {
+              if (err3 != null) {
+                LOG.error(
+                  "Unable to remove the failsafe snapshot: " + 
failSafeSnapshotSnapshotName, err3);
+                future.completeExceptionally(err3);
+              } else {
+                future.complete(ret3);
+              }
+            });
         }
-      });
+      } );
+      }
+    } );
       return future;
     } else {
       return internalRestoreSnapshot(snapshotName, tableName);
     }
   }
 
-  @Override
-  public CompletableFuture<Void> restoreSnapshot(String snapshotName,
-      boolean takeFailSafeSnapshot) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    listSnapshots(snapshotName).whenComplete((snapshotDescriptions, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-        return;
-      }
-      TableName tableName = null;
-      if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
-        for (SnapshotDescription snap : snapshotDescriptions) {
-          if (snap.getName().equals(snapshotName)) {
-            tableName = snap.getTableName();
-            break;
-          }
-        }
-      }
-      if (tableName == null) {
-        future.completeExceptionally(new RestoreSnapshotException(
-            "Unable to find the table name for snapshot=" + snapshotName));
-        return;
-      }
-      final TableName finalTableName = tableName;
-      tableExists(finalTableName).whenComplete((exists, err2) -> {
-        if (err2 != null) {
-          future.completeExceptionally(err2);
-        } else if (!exists) {
-          // if table does not exist, then just clone snapshot into new table.
-          completeConditionalOnFuture(future,
-              internalRestoreSnapshot(snapshotName, finalTableName));
-        } else {
-          isTableDisabled(finalTableName).whenComplete((disabled, err4) -> {
-            if (err4 != null) {
-              future.completeExceptionally(err4);
-            } else if (!disabled) {
-              future.completeExceptionally(new 
TableNotDisabledException(finalTableName));
-            } else {
-              completeConditionalOnFuture(future,
-                  restoreSnapshotWithFailSafe(snapshotName, finalTableName, 
takeFailSafeSnapshot));
-            }
-          });
-        }
-      });
-    });
-    return future;
-  }
-
   private <T> void completeConditionalOnFuture(CompletableFuture<T> 
dependentFuture,
       CompletableFuture<T> parentFuture) {
     parentFuture.whenComplete((res, err) -> {
@@ -1909,8 +1717,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
     return future;
   }
 
-  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName,
-      TableName tableName) {
+  private CompletableFuture<Void> internalRestoreSnapshot(String snapshotName, 
TableName tableName) {
     SnapshotProtos.SnapshotDescription snapshot = 
SnapshotProtos.SnapshotDescription.newBuilder()
         .setName(snapshotName).setTable(tableName.getNameAsString()).build();
     try {
@@ -1918,86 +1725,78 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
     } catch (IllegalArgumentException e) {
       return failedFuture(e);
     }
-    return waitProcedureResult(
-        this.<Long> newMasterCaller()
-            .action((controller, stub) -> this
-                .<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> 
call(controller, stub,
-                    RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot)
-                        
.setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(),
-                    (s, c, req, done) -> s.restoreSnapshot(c, req, done),
-                    (resp) -> resp.getProcId()))
-            .call());
+    return waitProcedureResult(this
+        .<Long> newMasterCaller()
+        .action(
+          (controller, stub) -> this.<RestoreSnapshotRequest, 
RestoreSnapshotResponse, Long> call(
+            controller, stub, 
RestoreSnapshotRequest.newBuilder().setSnapshot(snapshot)
+                
.setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build(), (s, c, req,
+                done) -> s.restoreSnapshot(c, req, done), (resp) -> 
resp.getProcId())).call());
   }
 
   @Override
   public CompletableFuture<List<SnapshotDescription>> listSnapshots() {
-    return this.<List<SnapshotDescription>> newMasterCaller()
-        .action((controller, stub) -> this
-            .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, 
List<SnapshotDescription>> call(
-              controller, stub, 
GetCompletedSnapshotsRequest.newBuilder().build(),
-              (s, c, req, done) -> s.getCompletedSnapshots(c, req, done),
-              resp -> 
resp.getSnapshotsList().stream().map(ProtobufUtil::createSnapshotDesc)
-                  .collect(Collectors.toList())))
+    return this
+        .<List<SnapshotDescription>> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<GetCompletedSnapshotsRequest, GetCompletedSnapshotsResponse, 
List<SnapshotDescription>> call(
+                controller, stub, 
GetCompletedSnapshotsRequest.newBuilder().build(), (s, c, req,
+                    done) -> s.getCompletedSnapshots(c, req, done), resp -> 
resp.getSnapshotsList()
+                    
.stream().map(ProtobufUtil::createSnapshotDesc).collect(Collectors.toList())))
         .call();
   }
 
   @Override
-  public CompletableFuture<List<SnapshotDescription>> listSnapshots(String 
regex) {
-    return listSnapshots(Pattern.compile(regex));
-  }
-
-  @Override
   public CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern 
pattern) {
     CompletableFuture<List<SnapshotDescription>> future = new 
CompletableFuture<>();
-    listSnapshots().whenComplete((snapshotDescList, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-        return;
-      }
-      if (snapshotDescList == null || snapshotDescList.isEmpty()) {
-        future.complete(Collections.emptyList());
-        return;
-      }
-      future.complete(snapshotDescList.stream()
-          .filter(snap -> 
pattern.matcher(snap.getName()).matches()).collect(Collectors.toList()));
-    });
+    listSnapshots()
+        .whenComplete(
+          (snapshotDescList, err) -> {
+            if (err != null) {
+              future.completeExceptionally(err);
+              return;
+            }
+            if (snapshotDescList == null || snapshotDescList.isEmpty()) {
+              future.complete(Collections.emptyList());
+              return;
+            }
+            future.complete(snapshotDescList.stream()
+                .filter(snap -> pattern.matcher(snap.getName()).matches())
+                .collect(Collectors.toList()));
+          });
     return future;
   }
 
   @Override
-  public CompletableFuture<List<SnapshotDescription>> 
listTableSnapshots(String tableNameRegex,
-      String snapshotNameRegex) {
-    return listTableSnapshots(Pattern.compile(tableNameRegex), 
Pattern.compile(snapshotNameRegex));
-  }
-
-  @Override
   public CompletableFuture<List<SnapshotDescription>> 
listTableSnapshots(Pattern tableNamePattern,
       Pattern snapshotNamePattern) {
     CompletableFuture<List<SnapshotDescription>> future = new 
CompletableFuture<>();
-    listTableNames(tableNamePattern, false).whenComplete((tableNames, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-        return;
-      }
-      if (tableNames == null || tableNames.length <= 0) {
-        future.complete(Collections.emptyList());
-        return;
-      }
-      List<TableName> tableNameList = Arrays.asList(tableNames);
-      listSnapshots(snapshotNamePattern).whenComplete((snapshotDescList, err2) 
-> {
-        if (err2 != null) {
-          future.completeExceptionally(err2);
+    listTableNames(Optional.ofNullable(tableNamePattern), false).whenComplete(
+      (tableNames, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
           return;
         }
-        if (snapshotDescList == null || snapshotDescList.isEmpty()) {
+        if (tableNames == null || tableNames.size() <= 0) {
           future.complete(Collections.emptyList());
           return;
         }
-        future.complete(snapshotDescList.stream()
-            .filter(snap -> (snap != null && 
tableNameList.contains(snap.getTableName())))
-            .collect(Collectors.toList()));
+        listSnapshots(snapshotNamePattern).whenComplete(
+          (snapshotDescList, err2) -> {
+            if (err2 != null) {
+              future.completeExceptionally(err2);
+              return;
+            }
+            if (snapshotDescList == null || snapshotDescList.isEmpty()) {
+              future.complete(Collections.emptyList());
+              return;
+            }
+            future.complete(snapshotDescList.stream()
+                .filter(snap -> (snap != null && 
tableNames.contains(snap.getTableName())))
+                .collect(Collectors.toList()));
+          });
       });
-    });
     return future;
   }
 
@@ -2007,47 +1806,46 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<Void> deleteSnapshots(String regex) {
-    return deleteSnapshots(Pattern.compile(regex));
-  }
-
-  @Override
   public CompletableFuture<Void> deleteSnapshots(Pattern snapshotNamePattern) {
     return deleteTableSnapshots(null, snapshotNamePattern);
   }
 
   @Override
-  public CompletableFuture<Void> deleteTableSnapshots(String tableNameRegex,
-      String snapshotNameRegex) {
-    return deleteTableSnapshots(Pattern.compile(tableNameRegex),
-      Pattern.compile(snapshotNameRegex));
-  }
-
-  @Override
   public CompletableFuture<Void> deleteTableSnapshots(Pattern tableNamePattern,
       Pattern snapshotNamePattern) {
     CompletableFuture<Void> future = new CompletableFuture<>();
-    listTableSnapshots(tableNamePattern, snapshotNamePattern)
-        .whenComplete(((snapshotDescriptions, err) -> {
-          if (err != null) {
-            future.completeExceptionally(err);
-            return;
-          }
-          if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
-            future.complete(null);
-            return;
-          }
-          List<CompletableFuture<Void>> deleteSnapshotFutures = new 
ArrayList<>();
-          snapshotDescriptions
-              .forEach(snapDesc -> 
deleteSnapshotFutures.add(internalDeleteSnapshot(snapDesc)));
-          CompletableFuture
-              .allOf(deleteSnapshotFutures
-                  .toArray(new 
CompletableFuture<?>[deleteSnapshotFutures.size()]))
-              .thenAccept(v -> future.complete(v));
-        }));
+    listTableSnapshots(tableNamePattern, snapshotNamePattern).whenComplete(
+      ((snapshotDescriptions, err) -> {
+        if (err != null) {
+          future.completeExceptionally(err);
+          return;
+        }
+        if (snapshotDescriptions == null || snapshotDescriptions.isEmpty()) {
+          future.complete(null);
+          return;
+        }
+        List<CompletableFuture<Void>> deleteSnapshotFutures = new 
ArrayList<>();
+        snapshotDescriptions.forEach(snapDesc -> deleteSnapshotFutures
+            .add(internalDeleteSnapshot(snapDesc)));
+        CompletableFuture.allOf(
+          deleteSnapshotFutures.toArray(new 
CompletableFuture<?>[deleteSnapshotFutures.size()]))
+            .thenAccept(v -> future.complete(v));
+      }));
     return future;
   }
 
+  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription 
snapshot) {
+    return this
+        .<Void> newMasterCaller()
+        .action(
+          (controller, stub) -> this.<DeleteSnapshotRequest, 
DeleteSnapshotResponse, Void> call(
+            controller,
+            stub,
+            DeleteSnapshotRequest.newBuilder()
+                
.setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(), (s, 
c,
+                req, done) -> s.deleteSnapshot(c, req, done), resp -> 
null)).call();
+  }
+
   @Override
   public CompletableFuture<Void> execProcedure(String signature, String 
instance,
       Map<String, String> props) {
@@ -2072,9 +1870,9 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
             @Override
             public void run(Timeout timeout) throws Exception {
               if (EnvironmentEdgeManager.currentTime() < endTime) {
-                isProcedureFinished(signature, instance, 
props).whenComplete((done, err) -> {
-                  if (err != null) {
-                    future.completeExceptionally(err);
+                isProcedureFinished(signature, instance, 
props).whenComplete((done, err2) -> {
+                  if (err2 != null) {
+                    future.completeExceptionally(err2);
                     return;
                   }
                   if (done) {
@@ -2137,24 +1935,87 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
   }
 
   @Override
-  public CompletableFuture<ProcedureInfo[]> listProcedures() {
-    return this.<ProcedureInfo[]> newMasterCaller()
-        .action((controller, stub) -> this
-            .<ListProceduresRequest, ListProceduresResponse, ProcedureInfo[]> 
call(controller, stub,
-              ListProceduresRequest.newBuilder().build(),
-              (s, c, req, done) -> s.listProcedures(c, req, done), resp -> 
resp.getProcedureList()
-                  
.stream().map(ProtobufUtil::toProcedureInfo).toArray(ProcedureInfo[]::new)))
-        .call();
+  public CompletableFuture<List<ProcedureInfo>> listProcedures() {
+    return this
+        .<List<ProcedureInfo>> newMasterCaller()
+        .action(
+          (controller, stub) -> this
+              .<ListProceduresRequest, ListProceduresResponse, 
List<ProcedureInfo>> call(
+                controller, stub, ListProceduresRequest.newBuilder().build(),
+                (s, c, req, done) -> s.listProcedures(c, req, done),
+                resp -> 
resp.getProcedureList().stream().map(ProtobufUtil::toProcedureInfo)
+                    .collect(Collectors.toList()))).call();
   }
 
-  private CompletableFuture<Void> internalDeleteSnapshot(SnapshotDescription 
snapshot) {
-    return this.<Void> newMasterCaller()
-        .action((controller, stub) -> this
-            .<DeleteSnapshotRequest, DeleteSnapshotResponse, Void> 
call(controller, stub,
-              DeleteSnapshotRequest.newBuilder()
-                  
.setSnapshot(ProtobufUtil.createHBaseProtosSnapshotDesc(snapshot)).build(),
-              (s, c, req, done) -> s.deleteSnapshot(c, req, done), resp -> 
null))
-        .call();
+  /**
+   * Get the region location for the passed region name. The region name may 
be a full region name
+   * or encoded region name. If the region does not found, then it'll throw an
+   * UnknownRegionException wrapped by a {@link CompletableFuture}
+   * @param regionNameOrEncodedRegionName
+   * @return region location, wrapped by a {@link CompletableFuture}
+   */
+  @VisibleForTesting
+  CompletableFuture<HRegionLocation> getRegionLocation(byte[] 
regionNameOrEncodedRegionName) {
+    if (regionNameOrEncodedRegionName == null) {
+      return failedFuture(new IllegalArgumentException("Passed region name 
can't be null"));
+    }
+    try {
+      CompletableFuture<Optional<HRegionLocation>> future;
+      if (HRegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) {
+        future = 
AsyncMetaTableAccessor.getRegionLocationWithEncodedName(metaTable,
+          regionNameOrEncodedRegionName);
+      } else {
+        future = AsyncMetaTableAccessor.getRegionLocation(metaTable, 
regionNameOrEncodedRegionName);
+      }
+
+      CompletableFuture<HRegionLocation> returnedFuture = new 
CompletableFuture<>();
+      future.whenComplete((location, err) -> {
+        if (err != null) {
+          returnedFuture.completeExceptionally(err);
+          return;
+        }
+        if (!location.isPresent() || location.get().getRegionInfo() == null) {
+          returnedFuture.completeExceptionally(new UnknownRegionException(
+              "Invalid region name or encoded region name: "
+                  + Bytes.toStringBinary(regionNameOrEncodedRegionName)));
+        } else {
+          returnedFuture.complete(location.get());
+        }
+      });
+      return returnedFuture;
+    } catch (IOException e) {
+      return failedFuture(e);
+    }
+  }
+
+  /**
+   * Get the region info for the passed region name. The region name may be a 
full region name or
+   * encoded region name. If the region does not found, then it'll throw an 
UnknownRegionException
+   * wrapped by a {@link CompletableFuture}
+   * @param regionNameOrEncodedRegionName
+   * @return region info, wrapped by a {@link CompletableFuture}
+   */
+  private CompletableFuture<HRegionInfo> getRegionInfo(byte[] 
regionNameOrEncodedRegionName) {
+    if (regionNameOrEncodedRegionName == null) {
+      return failedFuture(new IllegalArgumentException("Passed region name 
can't be null"));
+    }
+
+    if (Bytes.equals(regionNameOrEncodedRegionName,
+      HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
+        || Bytes.equals(regionNameOrEncodedRegionName,
+          HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) {
+      return 
CompletableFuture.completedFuture(HRegionInfo.FIRST_META_REGIONINFO);
+    }
+
+    CompletableFuture<HRegionInfo> future = new CompletableFuture<>();
+    getRegionLocation(regionNameOrEncodedRegionName).whenComplete((location, 
err) -> {
+      if (err != null) {
+        future.completeExceptionally(err);
+      } else {
+        future.complete(location.getRegionInfo());
+      }
+    });
+    return future;
   }
 
   private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int 
numRegions) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/28993833/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index b196911..5f8924f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -33,8 +33,10 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableSet;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -414,18 +416,14 @@ public final class ProtobufUtil {
   }
 
   /**
-   * Get NamespaceDescriptor[] from ListNamespaceDescriptorsResponse protobuf
+   * Get a list of NamespaceDescriptor from ListNamespaceDescriptorsResponse 
protobuf
    * @param proto the ListNamespaceDescriptorsResponse
-   * @return NamespaceDescriptor[]
+   * @return a list of NamespaceDescriptor
    */
-  public static NamespaceDescriptor[] getNamespaceDescriptorArray(
+  public static List<NamespaceDescriptor> toNamespaceDescriptorList(
       ListNamespaceDescriptorsResponse proto) {
-    List<HBaseProtos.NamespaceDescriptor> list = 
proto.getNamespaceDescriptorList();
-    NamespaceDescriptor[] res = new NamespaceDescriptor[list.size()];
-    for (int i = 0; i < list.size(); i++) {
-      res[i] = ProtobufUtil.toNamespaceDescriptor(list.get(i));
-    }
-    return res;
+    return 
proto.getNamespaceDescriptorList().stream().map(ProtobufUtil::toNamespaceDescriptor)
+        .collect(Collectors.toList());
   }
 
   /**
@@ -433,7 +431,7 @@ public final class ProtobufUtil {
    *
    * @param proto the GetTableDescriptorsResponse
    * @return a immutable HTableDescriptor array
-   * @deprecated Use {@link #getTableDescriptorArray} after removing the 
HTableDescriptor
+   * @deprecated Use {@link #toTableDescriptorList} after removing the 
HTableDescriptor
    */
   @Deprecated
   public static HTableDescriptor[] 
getHTableDescriptorArray(GetTableDescriptorsResponse proto) {
@@ -447,18 +445,17 @@ public final class ProtobufUtil {
   }
 
   /**
-   * Get TableDescriptor[] from GetTableDescriptorsResponse protobuf
+   * Get a list of TableDescriptor from GetTableDescriptorsResponse protobuf
    *
    * @param proto the GetTableDescriptorsResponse
-   * @return TableDescriptor[]
+   * @return a list of TableDescriptor
    */
-  public static TableDescriptor[] 
getTableDescriptorArray(GetTableDescriptorsResponse proto) {
-    if (proto == null) return new TableDescriptor[0];
-    return proto.getTableSchemaList()
-                .stream()
-                .map(ProtobufUtil::convertToTableDesc)
-                .toArray(size -> new TableDescriptor[size]);
+  public static List<TableDescriptor> 
toTableDescriptorList(GetTableDescriptorsResponse proto) {
+    if (proto == null) return new ArrayList<>();
+    return 
proto.getTableSchemaList().stream().map(ProtobufUtil::convertToTableDesc)
+        .collect(Collectors.toList());
   }
+
   /**
    * get the split keys in form "byte [][]" from a CreateTableRequest proto
    *
@@ -2398,6 +2395,13 @@ public final class ProtobufUtil {
         
.setQualifier(UnsafeByteOperations.unsafeWrap(tableName.getQualifier())).build();
   }
 
+  public static List<TableName> toTableNameList(List<HBaseProtos.TableName> 
tableNamesList) {
+    if (tableNamesList == null) {
+      return new ArrayList<>();
+    }
+    return 
tableNamesList.stream().map(ProtobufUtil::toTableName).collect(Collectors.toList());
+  }
+
   public static TableName[] getTableNameArray(List<HBaseProtos.TableName> 
tableNamesList) {
     if (tableNamesList == null) {
       return new TableName[0];
@@ -3345,23 +3349,33 @@ public final class ProtobufUtil {
    }
 
   /**
-    * Create a SplitRegionRequest for a given region name
-    *
-    * @param regionName the name of the region to split
-    * @param splitPoint the split point
-    * @return a SplitRegionRequest
-    */
-   public static SplitRegionRequest buildSplitRegionRequest(
-       final byte[] regionName, final byte[] splitPoint) {
-     SplitRegionRequest.Builder builder = SplitRegionRequest.newBuilder();
-     RegionSpecifier region = RequestConverter.buildRegionSpecifier(
-       RegionSpecifierType.REGION_NAME, regionName);
-     builder.setRegion(region);
-     if (splitPoint != null) {
-       builder.setSplitPoint(UnsafeByteOperations.unsafeWrap(splitPoint));
-     }
-     return builder.build();
-   }
+   * Create a SplitRegionRequest for a given region name
+   * @param regionName the name of the region to split
+   * @param splitPoint the split point
+   * @return a SplitRegionRequest
+   * @deprecated Use {@link #buildSplitRegionRequest(byte[], Optional)} 
instead.
+   */
+  @Deprecated
+  public static SplitRegionRequest buildSplitRegionRequest(final byte[] 
regionName,
+      final byte[] splitPoint) {
+    return buildSplitRegionRequest(regionName, 
Optional.ofNullable(splitPoint));
+  }
+
+  /**
+   * Create a SplitRegionRequest for a given region name
+   * @param regionName the name of the region to split
+   * @param splitPoint the split point
+   * @return a SplitRegionRequest
+   */
+  public static SplitRegionRequest buildSplitRegionRequest(byte[] regionName,
+      Optional<byte[]> splitPoint) {
+    SplitRegionRequest.Builder builder = SplitRegionRequest.newBuilder();
+    RegionSpecifier region =
+        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 
regionName);
+    builder.setRegion(region);
+    splitPoint.ifPresent(sp -> 
builder.setSplitPoint(UnsafeByteOperations.unsafeWrap(sp)));
+    return builder.build();
+  }
 
   public static ProcedureDescription buildProcedureDescription(String 
signature, String instance,
       Map<String, String> props) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/28993833/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 67f7d0a..39ae6a5 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.shaded.protobuf;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.regex.Pattern;
 
@@ -919,25 +920,37 @@ public final class RequestConverter {
     builder.setRegionInfo(HRegionInfo.convert(regionInfo));
     return builder.build();
   }
- /**
-  * Create a  CompactRegionRequest for a given region name
-  *
-  * @param regionName the name of the region to get info
-  * @param major indicator if it is a major compaction
-  * @return a CompactRegionRequest
-  */
- public static CompactRegionRequest buildCompactRegionRequest(
-     final byte[] regionName, final boolean major, final byte [] family) {
-   CompactRegionRequest.Builder builder = CompactRegionRequest.newBuilder();
-   RegionSpecifier region = buildRegionSpecifier(
-     RegionSpecifierType.REGION_NAME, regionName);
-   builder.setRegion(region);
-   builder.setMajor(major);
-   if (family != null) {
-     builder.setFamily(UnsafeByteOperations.unsafeWrap(family));
-   }
-   return builder.build();
- }
+
+  /**
+   * Create a CompactRegionRequest for a given region name
+   * @param regionName the name of the region to get info
+   * @param major indicator if it is a major compaction
+   * @param columnFamily
+   * @return a CompactRegionRequest
+   * @deprecated Use {@link #buildCompactRegionRequest(byte[], boolean, 
Optional)} instead.
+   */
+  @Deprecated
+  public static CompactRegionRequest buildCompactRegionRequest(byte[] 
regionName, boolean major,
+      byte[] columnFamily) {
+    return buildCompactRegionRequest(regionName, major, 
Optional.ofNullable(columnFamily));
+  }
+
+  /**
+   * Create a CompactRegionRequest for a given region name
+   * @param regionName the name of the region to get info
+   * @param major indicator if it is a major compaction
+   * @param columnFamily
+   * @return a CompactRegionRequest
+   */
+  public static CompactRegionRequest buildCompactRegionRequest(byte[] 
regionName, boolean major,
+      Optional<byte[]> columnFamily) {
+    CompactRegionRequest.Builder builder = CompactRegionRequest.newBuilder();
+    RegionSpecifier region = 
buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
+    builder.setRegion(region);
+    builder.setMajor(major);
+    columnFamily.ifPresent(family -> 
builder.setFamily(UnsafeByteOperations.unsafeWrap(family)));
+    return builder.build();
+  }
 
  /**
   * @see {@link #buildRollWALWriterRequest()}
@@ -1084,12 +1097,13 @@ public final class RequestConverter {
 
   /**
    * Create a protocol buffer MoveRegionRequest
-   *
    * @param encodedRegionName
    * @param destServerName
    * @return A MoveRegionRequest
    * @throws DeserializationException
+   * @deprecated Use {@link #buildMoveRegionRequest(byte[], Optional)} instead.
    */
+  @Deprecated
   public static MoveRegionRequest buildMoveRegionRequest(
       final byte [] encodedRegionName, final byte [] destServerName) throws
       DeserializationException {
@@ -1103,6 +1117,22 @@ public final class RequestConverter {
     return builder.build();
   }
 
+  /**
+   * Create a protocol buffer MoveRegionRequest
+   * @param encodedRegionName
+   * @param destServerName
+   * @return A MoveRegionRequest
+   */
+  public static MoveRegionRequest buildMoveRegionRequest(byte[] 
encodedRegionName,
+      Optional<ServerName> destServerName) {
+    MoveRegionRequest.Builder builder = MoveRegionRequest.newBuilder();
+    
builder.setRegion(buildRegionSpecifier(RegionSpecifierType.ENCODED_REGION_NAME,
+      encodedRegionName));
+    destServerName.ifPresent(serverName -> 
builder.setDestServerName(ProtobufUtil
+        .toServerName(serverName)));
+    return builder.build();
+  }
+
   public static MergeTableRegionsRequest buildMergeTableRegionsRequest(
       final byte[][] encodedNameOfdaughaterRegions,
       final boolean forcible,
@@ -1310,11 +1340,25 @@ public final class RequestConverter {
    * @param pattern The compiled regular expression to match against
    * @param includeSysTables False to match only against userspace tables
    * @return a GetTableDescriptorsRequest
+   * @deprecated Use {@link #buildGetTableDescriptorsRequest(Optional, 
boolean)} instead.
    */
+  @Deprecated
   public static GetTableDescriptorsRequest 
buildGetTableDescriptorsRequest(final Pattern pattern,
       boolean includeSysTables) {
+    return buildGetTableDescriptorsRequest(Optional.ofNullable(pattern), 
includeSysTables);
+  }
+
+  /**
+   * Creates a protocol buffer GetTableDescriptorsRequest
+   *
+   * @param pattern The compiled regular expression to match against
+   * @param includeSysTables False to match only against userspace tables
+   * @return a GetTableDescriptorsRequest
+   */
+  public static GetTableDescriptorsRequest
+      buildGetTableDescriptorsRequest(Optional<Pattern> pattern, boolean 
includeSysTables) {
     GetTableDescriptorsRequest.Builder builder = 
GetTableDescriptorsRequest.newBuilder();
-    if (pattern != null) builder.setRegex(pattern.toString());
+    pattern.ifPresent(p -> builder.setRegex(p.toString()));
     builder.setIncludeSysTables(includeSysTables);
     return builder.build();
   }
@@ -1325,11 +1369,25 @@ public final class RequestConverter {
    * @param pattern The compiled regular expression to match against
    * @param includeSysTables False to match only against userspace tables
    * @return a GetTableNamesRequest
+   * @deprecated Use {@link #buildGetTableNamesRequest(Optional, boolean)} 
instead.
    */
+  @Deprecated
   public static GetTableNamesRequest buildGetTableNamesRequest(final Pattern 
pattern,
       boolean includeSysTables) {
+    return buildGetTableNamesRequest(Optional.ofNullable(pattern), 
includeSysTables);
+  }
+
+  /**
+   * Creates a protocol buffer GetTableNamesRequest
+   *
+   * @param pattern The compiled regular expression to match against
+   * @param includeSysTables False to match only against userspace tables
+   * @return a GetTableNamesRequest
+   */
+  public static GetTableNamesRequest 
buildGetTableNamesRequest(Optional<Pattern> pattern,
+      boolean includeSysTables) {
     GetTableNamesRequest.Builder builder = GetTableNamesRequest.newBuilder();
-    if (pattern != null) builder.setRegex(pattern.toString());
+    pattern.ifPresent(p -> builder.setRegex(p.toString()));
     builder.setIncludeSysTables(includeSysTables);
     return builder.build();
   }
@@ -1635,11 +1693,18 @@ public final class RequestConverter {
     return builder.build();
   }
 
+  /**
+   * @deprecated Use {@link #buildListReplicationPeersRequest(Optional)} 
instead.
+   */
+  @Deprecated
   public static ListReplicationPeersRequest 
buildListReplicationPeersRequest(Pattern pattern) {
+    return buildListReplicationPeersRequest(Optional.ofNullable(pattern));
+  }
+
+  public static ListReplicationPeersRequest
+      buildListReplicationPeersRequest(Optional<Pattern> pattern) {
     ListReplicationPeersRequest.Builder builder = 
ListReplicationPeersRequest.newBuilder();
-    if (pattern != null) {
-      builder.setRegex(pattern.toString());
-    }
+    pattern.ifPresent(p -> builder.setRegex(p.toString()));
     return builder.build();
   }
 

Reply via email to