Repository: hbase
Updated Branches:
  refs/heads/master af9d359b8 -> 9a78d0088


HBASE-17389 Convert all internal usages from ReplicationAdmin to Admin


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9a78d008
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9a78d008
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9a78d008

Branch: refs/heads/master
Commit: 9a78d008841726ec2029215cddf0c0b2141771ae
Parents: af9d359
Author: Guanghao Zhang <zg...@apache.org>
Authored: Tue Feb 7 10:18:59 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Tue Feb 7 10:18:59 2017 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/Admin.java   | 26 ++++++
 .../apache/hadoop/hbase/client/HBaseAdmin.java  | 83 +++++++++++++++++++
 .../client/replication/ReplicationAdmin.java    | 33 ++++++++
 .../hbase/client/replication/TableCFs.java      | 12 +++
 .../org/apache/hadoop/hbase/master/HMaster.java |  2 +
 .../master/cleaner/ReplicationMetaCleaner.java  | 18 ++--
 .../regionserver/DumpReplicationQueues.java     | 57 ++++++-------
 .../hbase/util/ServerRegionReplicaUtil.java     | 12 +--
 .../replication/TestMasterReplication.java      | 47 ++++-------
 ...sibilityLabelReplicationWithExpAsString.java |  4 +-
 .../TestVisibilityLabelsReplication.java        |  6 +-
 .../src/main/ruby/hbase/replication_admin.rb    | 63 +++++++-------
 .../src/main/ruby/shell/commands/list_peers.rb  | 16 ++--
 .../shell/commands/list_replicated_tables.rb    | 17 ++--
 .../test/ruby/hbase/replication_admin_test.rb   | 86 ++++++++------------
 15 files changed, 305 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index 232dbf4..cc14acd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
@@ -49,6 +50,7 @@ import org.apache.hadoop.hbase.quotas.QuotaFilter;
 import org.apache.hadoop.hbase.quotas.QuotaRetriever;
 import org.apache.hadoop.hbase.quotas.QuotaSettings;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
@@ -1907,6 +1909,30 @@ public interface Admin extends Abortable, Closeable {
   }
 
   /**
+   * Append the replicable table-cf config of the specified peer
+   * @param id a short that identifies the cluster
+   * @param tableCfs A map from tableName to column family names
+   * @throws ReplicationException
+   * @throws IOException
+   */
+  default void appendReplicationPeerTableCFs(String id,
+      Map<TableName, ? extends Collection<String>> tableCfs) throws 
ReplicationException,
+      IOException {
+  }
+
+  /**
+   * Remove some table-cfs from config of the specified peer
+   * @param id a short name that identifies the cluster
+   * @param tableCfs A map from tableName to column family names
+   * @throws ReplicationException
+   * @throws IOException
+   */
+  default void removeReplicationPeerTableCFs(String id,
+      Map<TableName, ? extends Collection<String>> tableCfs) throws 
ReplicationException,
+      IOException {
+  }
+
+  /**
    * Return a list of replication peers.
    * @return a list of replication peers description
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 4e0a6c7..65070b9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -23,10 +23,13 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
@@ -81,6 +84,7 @@ import org.apache.hadoop.hbase.quotas.QuotaFilter;
 import org.apache.hadoop.hbase.quotas.QuotaRetriever;
 import org.apache.hadoop.hbase.quotas.QuotaSettings;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
@@ -197,6 +201,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.KeeperException;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcController;
@@ -3872,6 +3877,84 @@ public class HBaseAdmin implements Admin {
   }
 
   @Override
+  public void appendReplicationPeerTableCFs(String id,
+      Map<TableName, ? extends Collection<String>> tableCfs) throws 
ReplicationException,
+      IOException {
+    if (tableCfs == null) {
+      throw new ReplicationException("tableCfs is null");
+    }
+    ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
+    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
+    if (preTableCfs == null) {
+      peerConfig.setTableCFsMap(tableCfs);
+    } else {
+      for (Map.Entry<TableName, ? extends Collection<String>> entry : 
tableCfs.entrySet()) {
+        TableName table = entry.getKey();
+        Collection<String> appendCfs = entry.getValue();
+        if (preTableCfs.containsKey(table)) {
+          List<String> cfs = preTableCfs.get(table);
+          if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
+            preTableCfs.put(table, null);
+          } else {
+            Set<String> cfSet = new HashSet<String>(cfs);
+            cfSet.addAll(appendCfs);
+            preTableCfs.put(table, Lists.newArrayList(cfSet));
+          }
+        } else {
+          if (appendCfs == null || appendCfs.isEmpty()) {
+            preTableCfs.put(table, null);
+          } else {
+            preTableCfs.put(table, Lists.newArrayList(appendCfs));
+          }
+        }
+      }
+    }
+    updateReplicationPeerConfig(id, peerConfig);
+  }
+
+  @Override
+  public void removeReplicationPeerTableCFs(String id,
+      Map<TableName, ? extends Collection<String>> tableCfs) throws 
ReplicationException,
+      IOException {
+    if (tableCfs == null) {
+      throw new ReplicationException("tableCfs is null");
+    }
+    ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
+    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
+    if (preTableCfs == null) {
+      throw new ReplicationException("Table-Cfs for peer" + id + " is null");
+    }
+    for (Map.Entry<TableName, ? extends Collection<String>> entry : 
tableCfs.entrySet()) {
+
+      TableName table = entry.getKey();
+      Collection<String> removeCfs = entry.getValue();
+      if (preTableCfs.containsKey(table)) {
+        List<String> cfs = preTableCfs.get(table);
+        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
+          preTableCfs.remove(table);
+        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) 
{
+          Set<String> cfSet = new HashSet<String>(cfs);
+          cfSet.removeAll(removeCfs);
+          if (cfSet.isEmpty()) {
+            preTableCfs.remove(table);
+          } else {
+            preTableCfs.put(table, Lists.newArrayList(cfSet));
+          }
+        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) 
{
+          throw new ReplicationException("Cannot remove cf of table: " + table
+              + " which doesn't specify cfs from table-cfs config in peer: " + 
id);
+        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
+          throw new ReplicationException("Cannot remove table: " + table
+              + " which has specified cfs from table-cfs config in peer: " + 
id);
+        }
+      } else {
+        throw new ReplicationException("No table: " + table + " in table-cfs 
config of peer: " + id);
+      }
+    }
+    updateReplicationPeerConfig(id, peerConfig);
+  }
+
+  @Override
   public List<ReplicationPeerDescription> listReplicationPeers() throws 
IOException {
     return listReplicationPeers((Pattern)null);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index c6d580b..706f81e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -194,7 +194,11 @@ public class ReplicationAdmin implements Closeable {
    * Add a new remote slave cluster for replication.
    * @param id a short name that identifies the cluster
    * @param peerConfig configuration for the replication slave cluster
+   * @deprecated use
+   *             {@link 
org.apache.hadoop.hbase.client.Admin#addReplicationPeer(String, 
ReplicationPeerConfig)}
+   *             instead
    */
+  @Deprecated
   public void addPeer(String id, ReplicationPeerConfig peerConfig) throws 
ReplicationException,
       IOException {
     checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
@@ -210,6 +214,12 @@ public class ReplicationAdmin implements Closeable {
     return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig);
   }
 
+  /**
+   * @deprecated use
+   *             {@link 
org.apache.hadoop.hbase.client.Admin#updateReplicationPeerConfig(String, 
ReplicationPeerConfig)}
+   *             instead
+   */
+  @Deprecated
   public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) 
throws IOException {
     this.admin.updateReplicationPeerConfig(id, peerConfig);
   }
@@ -217,7 +227,9 @@ public class ReplicationAdmin implements Closeable {
   /**
    * Removes a peer cluster and stops the replication to it.
    * @param id a short name that identifies the cluster
+   * @deprecated use {@link 
org.apache.hadoop.hbase.client.Admin#removeReplicationPeer(String)} instead
    */
+  @Deprecated
   public void removePeer(String id) throws IOException {
     this.admin.removeReplicationPeer(id);
   }
@@ -225,7 +237,10 @@ public class ReplicationAdmin implements Closeable {
   /**
    * Restart the replication stream to the specified peer.
    * @param id a short name that identifies the cluster
+   * @deprecated use {@link 
org.apache.hadoop.hbase.client.Admin#enableReplicationPeer(String)}
+   *             instead
    */
+  @Deprecated
   public void enablePeer(String id) throws IOException {
     this.admin.enableReplicationPeer(id);
   }
@@ -233,7 +248,10 @@ public class ReplicationAdmin implements Closeable {
   /**
    * Stop the replication stream to the specified peer.
    * @param id a short name that identifies the cluster
+   * @deprecated use {@link 
org.apache.hadoop.hbase.client.Admin#disableReplicationPeer(String)}
+   *             instead
    */
+  @Deprecated
   public void disablePeer(String id) throws IOException {
     this.admin.disableReplicationPeer(id);
   }
@@ -242,11 +260,17 @@ public class ReplicationAdmin implements Closeable {
    * Get the number of slave clusters the local cluster has.
    * @return number of slave clusters
    * @throws IOException
+   * @deprecated
    */
+  @Deprecated
   public int getPeersCount() throws IOException {
     return this.admin.listReplicationPeers().size();
   }
 
+  /**
+   * @deprecated use {@link 
org.apache.hadoop.hbase.client.Admin#listReplicationPeers()} instead
+   */
+  @Deprecated
   public Map<String, ReplicationPeerConfig> listPeerConfigs() throws 
IOException {
     List<ReplicationPeerDescription> peers = this.admin.listReplicationPeers();
     Map<String, ReplicationPeerConfig> result = new TreeMap<String, 
ReplicationPeerConfig>();
@@ -256,6 +280,11 @@ public class ReplicationAdmin implements Closeable {
     return result;
   }
 
+  /**
+   * @deprecated use {@link 
org.apache.hadoop.hbase.client.Admin#getReplicationPeerConfig(String)}
+   *             instead
+   */
+  @Deprecated
   public ReplicationPeerConfig getPeerConfig(String id) throws IOException {
     return admin.getReplicationPeerConfig(id);
   }
@@ -294,6 +323,7 @@ public class ReplicationAdmin implements Closeable {
    * @throws ReplicationException
    * @throws IOException
    */
+  @Deprecated
   public void appendPeerTableCFs(String id, Map<TableName, ? extends 
Collection<String>> tableCfs)
       throws ReplicationException, IOException {
     if (tableCfs == null) {
@@ -350,6 +380,7 @@ public class ReplicationAdmin implements Closeable {
    * @throws ReplicationException
    * @throws IOException
    */
+  @Deprecated
   public void removePeerTableCFs(String id, Map<TableName, ? extends 
Collection<String>> tableCfs)
       throws ReplicationException, IOException {
     if (tableCfs == null) {
@@ -398,6 +429,7 @@ public class ReplicationAdmin implements Closeable {
    * to indicate replicating all column families. Pass null for replicating 
all table and column
    * families
    */
+  @Deprecated
   public void setPeerTableCFs(String id, Map<TableName, ? extends 
Collection<String>> tableCfs)
       throws IOException {
     ReplicationPeerConfig peerConfig = getPeerConfig(id);
@@ -411,6 +443,7 @@ public class ReplicationAdmin implements Closeable {
    * an IllegalArgumentException is thrown if it doesn't exist
    * @return true if replication is enabled to that peer, false if it isn't
    */
+  @Deprecated
   public boolean getPeerState(String id) throws ReplicationException, 
IOException {
     List<ReplicationPeerDescription> peers = admin.listReplicationPeers(id);
     if (peers.isEmpty() || !id.equals(peers.get(0).getPeerId())) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/TableCFs.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/TableCFs.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/TableCFs.java
index fc39087..f293586 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/TableCFs.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/TableCFs.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client.replication;
 
 import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -46,4 +47,15 @@ public class TableCFs {
   public Map<String, Integer> getColumnFamilyMap() {
     return this.cfs;
   }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(table.getNameAsString());
+    if (!cfs.isEmpty()) {
+      sb.append(":");
+      sb.append(StringUtils.join(cfs.keySet(), ','));
+    }
+    return sb.toString();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 3374405..4aff21a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -3177,6 +3177,8 @@ public class HMaster extends HRegionServer implements 
MasterServices {
       cpHost.preGetReplicationPeerConfig(peerId);
     }
     final ReplicationPeerConfig peerConfig = 
this.replicationManager.getPeerConfig(peerId);
+    LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + 
peerId + ", config="
+        + peerConfig);
     if (cpHost != null) {
       cpHost.postGetReplicationPeerConfig(peerId);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
index b133c56..5c56271 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java
@@ -35,11 +35,11 @@ import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -50,14 +50,14 @@ public class ReplicationMetaCleaner extends ScheduledChore {
 
   private static final Log LOG = 
LogFactory.getLog(ReplicationMetaCleaner.class);
 
-  private ReplicationAdmin replicationAdmin;
-  private MasterServices master;
+  private final Admin admin;
+  private final MasterServices master;
 
   public ReplicationMetaCleaner(MasterServices master, Stoppable stoppable, 
int period)
       throws IOException {
     super("ReplicationMetaCleaner", stoppable, period);
     this.master = master;
-    replicationAdmin = new ReplicationAdmin(master.getConfiguration());
+    admin = master.getConnection().getAdmin();
   }
 
   @Override
@@ -81,12 +81,12 @@ public class ReplicationMetaCleaner extends ScheduledChore {
         return;
       }
 
-      Map<String, ReplicationPeerConfig> peers = 
replicationAdmin.listPeerConfigs();
-      for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
-        for (Map.Entry<TableName, List<String>> map : 
entry.getValue().getTableCFsMap()
+      List<ReplicationPeerDescription> peers = admin.listReplicationPeers();
+      for (ReplicationPeerDescription peerDesc : peers) {
+        for (Map.Entry<TableName, List<String>> map : 
peerDesc.getPeerConfig().getTableCFsMap()
             .entrySet()) {
           if (serialTables.containsKey(map.getKey().getNameAsString())) {
-            
serialTables.get(map.getKey().getNameAsString()).add(entry.getKey());
+            
serialTables.get(map.getKey().getNameAsString()).add(peerDesc.getPeerId());
             break;
           }
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 766b551..9a1e2bc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -21,12 +21,12 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,15 +37,16 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.client.replication.TableCFs;
 import org.apache.hadoop.hbase.io.WALLink;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
-import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -207,8 +208,8 @@ public class DumpReplicationQueues extends Configured 
implements Tool {
 
     Configuration conf = getConf();
     HBaseAdmin.available(conf);
-    ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf);
     ClusterConnection connection = (ClusterConnection) 
ConnectionFactory.createConnection(conf);
+    Admin admin = connection.getAdmin();
 
     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "DumpReplicationQueues" 
+ System.currentTimeMillis(),
         new WarnOnlyAbortable(), true);
@@ -216,26 +217,28 @@ public class DumpReplicationQueues extends Configured 
implements Tool {
     try {
       // Our zk watcher
       LOG.info("Our Quorum: " + zkw.getQuorum());
-      List<HashMap<String, String>> replicatedTables = 
replicationAdmin.listReplicated();
-      if (replicatedTables.isEmpty()) {
+      List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs();
+      if (replicatedTableCFs.isEmpty()) {
         LOG.info("No tables with a configured replication peer were found.");
         return(0);
       } else {
-        LOG.info("Replicated Tables: " + replicatedTables);
+        LOG.info("Replicated Tables: " + replicatedTableCFs);
       }
 
-      Map<String, ReplicationPeerConfig> peerConfigs = 
replicationAdmin.listPeerConfigs();
+      List<ReplicationPeerDescription> peers = admin.listReplicationPeers();
 
-      if (peerConfigs.isEmpty()) {
+      if (peers.isEmpty()) {
         LOG.info("Replication is enabled but no peer configuration was 
found.");
       }
 
       System.out.println("Dumping replication peers and configurations:");
-      System.out.println(dumpPeersState(replicationAdmin, peerConfigs));
+      System.out.println(dumpPeersState(peers));
 
       if (opts.isDistributed()) {
         LOG.info("Found [--distributed], will poll each RegionServer.");
-        System.out.println(dumpQueues(connection, zkw, peerConfigs.keySet(), 
opts.isHdfs()));
+        Set<String> peerIds = peers.stream().map((peer) -> peer.getPeerId())
+            .collect(Collectors.toSet());
+        System.out.println(dumpQueues(connection, zkw, peerIds, 
opts.isHdfs()));
         System.out.println(dumpReplicationSummary());
       } else {
         // use ZK instead
@@ -279,28 +282,22 @@ public class DumpReplicationQueues extends Configured 
implements Tool {
     return sb.toString();
   }
 
-  public String dumpPeersState(ReplicationAdmin replicationAdmin,
-      Map<String, ReplicationPeerConfig> peerConfigs) throws Exception {
+  public String dumpPeersState(List<ReplicationPeerDescription> peers) throws 
Exception {
     Map<String, String> currentConf;
     StringBuilder sb = new StringBuilder();
-    for (Map.Entry<String, ReplicationPeerConfig> peer : 
peerConfigs.entrySet()) {
-      try {
-        ReplicationPeerConfig peerConfig = peer.getValue();
-        sb.append("Peer: " + peer.getKey() + "\n");
-        sb.append("    " + "State: "
-            + (replicationAdmin.getPeerState(peer.getKey()) ? "ENABLED" : 
"DISABLED") + "\n");
-        sb.append("    " + "Cluster Name: " + peerConfig.getClusterKey() + 
"\n");
-        sb.append("    " + "Replication Endpoint: " + 
peerConfig.getReplicationEndpointImpl() + "\n");
-        currentConf = peerConfig.getConfiguration();
-        // Only show when we have a custom configuration for the peer
-        if (currentConf.size() > 1) {
-          sb.append("    " + "Peer Configuration: " + currentConf + "\n");
-        }
-        sb.append("    " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + 
"\n");
-        sb.append("    " + "Peer Namespaces: " + peerConfig.getNamespaces() + 
"\n");
-      } catch (ReplicationException re) {
-        sb.append("Got an exception while invoking ReplicationAdmin: " + re + 
"\n");
+    for (ReplicationPeerDescription peer : peers) {
+      ReplicationPeerConfig peerConfig = peer.getPeerConfig();
+      sb.append("Peer: " + peer.getPeerId() + "\n");
+      sb.append("    " + "State: " + (peer.isEnabled() ? "ENABLED" : 
"DISABLED") + "\n");
+      sb.append("    " + "Cluster Name: " + peerConfig.getClusterKey() + "\n");
+      sb.append("    " + "Replication Endpoint: " + 
peerConfig.getReplicationEndpointImpl() + "\n");
+      currentConf = peerConfig.getConfiguration();
+      // Only show when we have a custom configuration for the peer
+      if (currentConf.size() > 1) {
+        sb.append("    " + "Peer Configuration: " + currentConf + "\n");
       }
+      sb.append("    " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + 
"\n");
+      sb.append("    " + "Peer Namespaces: " + peerConfig.getNamespaces() + 
"\n");
     }
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index 9ecc9eb..648ccc6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.io.HFileLink;
@@ -148,10 +150,10 @@ public class ServerRegionReplicaUtil extends 
RegionReplicaUtil {
     if (!isRegionReplicaReplicationEnabled(conf)) {
       return;
     }
-    ReplicationAdmin repAdmin = new ReplicationAdmin(conf);
+    Admin admin = ConnectionFactory.createConnection(conf).getAdmin();
     ReplicationPeerConfig peerConfig = null;
     try {
-      peerConfig = repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER);
+      peerConfig = 
admin.getReplicationPeerConfig(REGION_REPLICA_REPLICATION_PEER);
     } catch (ReplicationPeerNotFoundException e) {
       LOG.warn("Region replica replication peer id=" + 
REGION_REPLICA_REPLICATION_PEER
           + " not exist", e);
@@ -163,12 +165,10 @@ public class ServerRegionReplicaUtil extends 
RegionReplicaUtil {
         peerConfig = new ReplicationPeerConfig();
         peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf));
         
peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName());
-        repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null);
+        admin.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig);
       }
-    } catch (ReplicationException ex) {
-      throw new IOException(ex);
     } finally {
-      repAdmin.close();
+      admin.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index c1b6f4a..3b6718a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -514,51 +514,34 @@ public class TestMasterReplication {
 
   private void addPeer(String id, int masterClusterNumber,
       int slaveClusterNumber) throws Exception {
-    ReplicationAdmin replicationAdmin = null;
-    try {
-      replicationAdmin = new ReplicationAdmin(
-          configurations[masterClusterNumber]);
-      ReplicationPeerConfig rpc = new ReplicationPeerConfig();
-      rpc.setClusterKey(utilities[slaveClusterNumber].getClusterKey());
-      replicationAdmin.addPeer(id, rpc, null);
-    } finally {
-      close(replicationAdmin);
+    try (Admin admin = 
ConnectionFactory.createConnection(configurations[masterClusterNumber])
+        .getAdmin()) {
+      admin.addReplicationPeer(id,
+        new 
ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()));
     }
   }
 
   private void addPeer(String id, int masterClusterNumber, int 
slaveClusterNumber, String tableCfs)
       throws Exception {
-    ReplicationAdmin replicationAdmin = null;
-    try {
-      replicationAdmin = new 
ReplicationAdmin(configurations[masterClusterNumber]);
-      ReplicationPeerConfig replicationPeerConfig = new 
ReplicationPeerConfig();
-      
replicationPeerConfig.setClusterKey(utilities[slaveClusterNumber].getClusterKey());
-      replicationAdmin.addPeer(id, replicationPeerConfig,
-        ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs));
-    } finally {
-      close(replicationAdmin);
+    try (Admin admin = 
ConnectionFactory.createConnection(configurations[masterClusterNumber])
+        .getAdmin()) {
+      admin.addReplicationPeer(id,
+        new 
ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())
+            
.setTableCFsMap(ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs)));
     }
   }
 
   private void disablePeer(String id, int masterClusterNumber) throws 
Exception {
-    ReplicationAdmin replicationAdmin = null;
-    try {
-      replicationAdmin = new ReplicationAdmin(
-          configurations[masterClusterNumber]);
-      replicationAdmin.disablePeer(id);
-    } finally {
-      close(replicationAdmin);
+    try (Admin admin = 
ConnectionFactory.createConnection(configurations[masterClusterNumber])
+        .getAdmin()) {
+      admin.disableReplicationPeer(id);
     }
   }
 
   private void enablePeer(String id, int masterClusterNumber) throws Exception 
{
-    ReplicationAdmin replicationAdmin = null;
-    try {
-      replicationAdmin = new ReplicationAdmin(
-          configurations[masterClusterNumber]);
-      replicationAdmin.enablePeer(id);
-    } finally {
-      close(replicationAdmin);
+    try (Admin admin = 
ConnectionFactory.createConnection(configurations[masterClusterNumber])
+        .getAdmin()) {
+      admin.enableReplicationPeer(id);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
index dd1fe2a..13d0e3c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java
@@ -113,7 +113,7 @@ public class TestVisibilityLabelReplicationWithExpAsString 
extends TestVisibilit
     TEST_UTIL.startMiniZKCluster();
     MiniZooKeeperCluster miniZK = TEST_UTIL.getZkCluster();
     zkw1 = new ZooKeeperWatcher(conf, "cluster1", null, true);
-    replicationAdmin = new ReplicationAdmin(conf);
+    admin = TEST_UTIL.getAdmin();
 
     // Base conf2 on conf1 so it gets the right zk cluster.
     conf1 = HBaseConfiguration.create(conf);
@@ -136,7 +136,7 @@ public class TestVisibilityLabelReplicationWithExpAsString 
extends TestVisibilit
 
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(TEST_UTIL1.getClusterKey());
-    replicationAdmin.addPeer("2", rpc, null);
+    admin.addReplicationPeer("2", rpc);
 
     HTableDescriptor table = new HTableDescriptor(TABLE_NAME);
     HColumnDescriptor desc = new HColumnDescriptor(fam);

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
index 5977c23..2181ddb 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
@@ -89,7 +89,7 @@ public class TestVisibilityLabelsReplication {
   protected static Configuration conf;
   protected static Configuration conf1;
   protected static TableName TABLE_NAME = TableName.valueOf("TABLE_NAME");
-  protected static ReplicationAdmin replicationAdmin;
+  protected static Admin admin;
   public static final String TOPSECRET = "topsecret";
   public static final String PUBLIC = "public";
   public static final String PRIVATE = "private";
@@ -161,7 +161,7 @@ public class TestVisibilityLabelsReplication {
     TEST_UTIL.startMiniZKCluster();
     MiniZooKeeperCluster miniZK = TEST_UTIL.getZkCluster();
     zkw1 = new ZooKeeperWatcher(conf, "cluster1", null, true);
-    replicationAdmin = new ReplicationAdmin(conf);
+    admin = TEST_UTIL.getAdmin();
 
     // Base conf2 on conf1 so it gets the right zk cluster.
     conf1 = HBaseConfiguration.create(conf);
@@ -185,7 +185,7 @@ public class TestVisibilityLabelsReplication {
 
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(TEST_UTIL1.getClusterKey());
-    replicationAdmin.addPeer("2", rpc, null);
+    admin.addReplicationPeer("2", rpc);
 
     Admin hBaseAdmin = TEST_UTIL.getAdmin();
     HTableDescriptor table = new HTableDescriptor(TABLE_NAME);

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-shell/src/main/ruby/hbase/replication_admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb 
b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index 3c94db2..b9df821 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -20,6 +20,7 @@
 include Java
 
 java_import org.apache.hadoop.hbase.client.replication.ReplicationAdmin
+java_import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper
 java_import org.apache.hadoop.hbase.replication.ReplicationPeerConfig
 java_import org.apache.hadoop.hbase.util.Bytes
 java_import org.apache.hadoop.hbase.zookeeper.ZKConfig
@@ -34,6 +35,7 @@ module Hbase
     def initialize(configuration)
       @replication_admin = ReplicationAdmin.new(configuration)
       @configuration = configuration
+      @admin = ConnectionFactory.createConnection(configuration).getAdmin
     end
 
     
#----------------------------------------------------------------------------------------------
@@ -100,7 +102,7 @@ module Hbase
           }
           replication_peer_config.set_table_cfs_map(map)
         end
-        @replication_admin.add_peer(id, replication_peer_config)
+        @admin.addReplicationPeer(id, replication_peer_config)
       else
         raise(ArgumentError, "args must be a Hash")
       end
@@ -109,46 +111,40 @@ module Hbase
     
#----------------------------------------------------------------------------------------------
     # Remove a peer cluster, stops the replication
     def remove_peer(id)
-      @replication_admin.removePeer(id)
+      @admin.removeReplicationPeer(id)
     end
 
-
     
#---------------------------------------------------------------------------------------------
     # Show replcated tables/column families, and their ReplicationType
     def list_replicated_tables(regex = ".*")
       pattern = java.util.regex.Pattern.compile(regex)
-      list = @replication_admin.listReplicated()
-      list.select {|s| pattern.match(s.get(ReplicationAdmin::TNAME))}
+      list = @admin.listReplicatedTableCFs()
+      list.select {|t| pattern.match(t.getTable().getNameAsString())}
     end
 
     
#----------------------------------------------------------------------------------------------
     # List all peer clusters
     def list_peers
-      @replication_admin.listPeerConfigs
-    end
-
-    
#----------------------------------------------------------------------------------------------
-    # Get peer cluster state
-    def get_peer_state(id)
-      @replication_admin.getPeerState(id) ? "ENABLED" : "DISABLED"
+      @admin.listReplicationPeers
     end
 
     
#----------------------------------------------------------------------------------------------
     # Restart the replication stream to the specified peer
     def enable_peer(id)
-      @replication_admin.enablePeer(id)
+      @admin.enableReplicationPeer(id)
     end
 
     
#----------------------------------------------------------------------------------------------
     # Stop the replication stream to the specified peer
     def disable_peer(id)
-      @replication_admin.disablePeer(id)
+      @admin.disableReplicationPeer(id)
     end
 
     
#----------------------------------------------------------------------------------------------
     # Show the current tableCFs config for the specified peer
     def show_peer_tableCFs(id)
-      @replication_admin.getPeerTableCFs(id)
+      rpc = @admin.getReplicationPeerConfig(id)
+      ReplicationSerDeHelper.convertToString(rpc.getTableCFsMap())
     end
 
     
#----------------------------------------------------------------------------------------------
@@ -160,8 +156,12 @@ module Hbase
         tableCFs.each{|key, val|
           map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
         }
+        rpc = get_peer_config(id)
+        unless rpc.nil?
+          rpc.setTableCFsMap(map)
+          @admin.updateReplicationPeerConfig(id, rpc)
+        end
       end
-      @replication_admin.setPeerTableCFs(id, map)
     end
 
     
#----------------------------------------------------------------------------------------------
@@ -174,7 +174,7 @@ module Hbase
           map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
         }
       end
-      @replication_admin.appendPeerTableCFs(id, map)
+      @admin.appendReplicationPeerTableCFs(id, map)
     end
 
     
#----------------------------------------------------------------------------------------------
@@ -187,7 +187,7 @@ module Hbase
           map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
         }
       end
-      @replication_admin.removePeerTableCFs(id, map)
+      @admin.removeReplicationPeerTableCFs(id, map)
     end
 
     # Set new namespaces config for the specified peer
@@ -200,7 +200,7 @@ module Hbase
         rpc = get_peer_config(id)
         unless rpc.nil?
           rpc.setNamespaces(ns_set)
-          @replication_admin.updatePeerConfig(id, rpc)
+          @admin.updateReplicationPeerConfig(id, rpc)
         end
       end
     end
@@ -218,7 +218,7 @@ module Hbase
             ns_set.add(n)
           end
           rpc.setNamespaces(ns_set)
-          @replication_admin.updatePeerConfig(id, rpc)
+          @admin.updateReplicationPeerConfig(id, rpc)
         end
       end
     end
@@ -235,7 +235,7 @@ module Hbase
             end
           end
           rpc.setNamespaces(ns_set)
-          @replication_admin.updatePeerConfig(id, rpc)
+          @admin.updateReplicationPeerConfig(id, rpc)
         end
       end
     end
@@ -257,7 +257,7 @@ module Hbase
       rpc = get_peer_config(id)
       unless rpc.nil?
         rpc.setBandwidth(bandwidth)
-        @replication_admin.updatePeerConfig(id, rpc)
+        @admin.updateReplicationPeerConfig(id, rpc)
       end
     end
 
@@ -265,26 +265,27 @@ module Hbase
     # Enables a table's replication switch
     def enable_tablerep(table_name)
       tableName = TableName.valueOf(table_name)
-      @replication_admin.enableTableRep(tableName)
+      @admin.enableTableReplication(tableName)
     end
 
     
#----------------------------------------------------------------------------------------------
     # Disables a table's replication switch
     def disable_tablerep(table_name)
       tableName = TableName.valueOf(table_name)
-      @replication_admin.disableTableRep(tableName)
+      @admin.disableTableReplication(tableName)
     end
 
     def list_peer_configs
-      @replication_admin.list_peer_configs
+      map = java.util.HashMap.new
+      peers = @admin.listReplicationPeers
+      peers.each do |peer|
+        map.put(peer.getPeerId, peer.getPeerConfig)
+      end
+      return map
     end
 
     def get_peer_config(id)
-      @replication_admin.get_peer_config(id)
-    end
-
-    def peer_added(id)
-      @replication_admin.peer_added(id)
+      @admin.getReplicationPeerConfig(id)
     end
 
     def update_peer_config(id, args={})
@@ -306,7 +307,7 @@ module Hbase
         }
       end
 
-      @replication_admin.update_peer_config(id, replication_peer_config)
+      @admin.updateReplicationPeerConfig(id, replication_peer_config)
     end
   end
 end

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb 
b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
index 7d53158..2dd8483 100644
--- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
@@ -35,13 +35,15 @@ EOF
         formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME",
           "STATE", "NAMESPACES", "TABLE_CFS", "BANDWIDTH"])
 
-        peers.entrySet().each do |e|
-          state = replication_admin.get_peer_state(e.key)
-          namespaces = replication_admin.show_peer_namespaces(e.value)
-          tableCFs = replication_admin.show_peer_tableCFs(e.key)
-          formatter.row([ e.key, e.value.getClusterKey,
-            e.value.getReplicationEndpointImpl, state, namespaces, tableCFs,
-            e.value.getBandwidth ])
+        peers.each do |peer|
+          id = peer.getPeerId
+          state = peer.isEnabled ? "ENABLED" : "DISABLED"
+          config = peer.getPeerConfig
+          namespaces = replication_admin.show_peer_namespaces(config)
+          tableCFs = replication_admin.show_peer_tableCFs(id)
+          formatter.row([ id, config.getClusterKey,
+            config.getReplicationEndpointImpl, state, namespaces, tableCFs,
+            config.getBandwidth ])
         end
 
         formatter.footer()

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-shell/src/main/ruby/shell/commands/list_replicated_tables.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/list_replicated_tables.rb 
b/hbase-shell/src/main/ruby/shell/commands/list_replicated_tables.rb
index 142adfc..4200cae 100644
--- a/hbase-shell/src/main/ruby/shell/commands/list_replicated_tables.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/list_replicated_tables.rb
@@ -34,12 +34,19 @@ EOF
         formatter.header([ "TABLE:COLUMNFAMILY", "ReplicationType" ], [ 32 ])
         list = replication_admin.list_replicated_tables(regex)
         list.each do |e|
-          if 
e.get(org.apache.hadoop.hbase.client.replication.ReplicationAdmin::REPLICATIONTYPE)
 == 
org.apache.hadoop.hbase.client.replication.ReplicationAdmin::REPLICATIONGLOBAL
-             replicateType = "GLOBAL"
-          else
-             replicateType = "unknown"
+          map = e.getColumnFamilyMap()
+          map.each do |cf|
+            if cf[1] == 
org.apache.hadoop.hbase.HConstants::REPLICATION_SCOPE_LOCAL
+              replicateType = "LOCAL"
+            elsif cf[1] == 
org.apache.hadoop.hbase.HConstants::REPLICATION_SCOPE_GLOBAL
+              replicateType = "GLOBAL"
+            elsif cf[1] == 
org.apache.hadoop.hbase.HConstants::REPLICATION_SCOPE_SERIAL
+              replicateType = "SERIAL"
+            else
+              replicateType = "UNKNOWN"
+            end
+            formatter.row([e.getTable().getNameAsString() + ":" + cf[0], 
replicateType], true, [32])
           end
-          
formatter.row([e.get(org.apache.hadoop.hbase.client.replication.ReplicationAdmin::TNAME)
 + ":" + 
e.get(org.apache.hadoop.hbase.client.replication.ReplicationAdmin::CFNAME), 
replicateType], true, [32])
         end
         formatter.footer()
       end

http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb 
b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
index cd1fe35..0d92287 100644
--- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
@@ -73,8 +73,8 @@ module Hbase
       command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key})
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      assert_equal(cluster_key, 
command(:list_peers).fetch(@peer_id).get_cluster_key)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      assert_equal(cluster_key, 
command(:list_peers).get(0).getPeerConfig.getClusterKey)
 
       # cleanup for future tests
       command(:remove_peer, @peer_id)
@@ -86,8 +86,8 @@ module Hbase
       command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key})
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      assert_equal(cluster_key, 
command(:list_peers).fetch(@peer_id).get_cluster_key)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      assert_equal(cluster_key, 
command(:list_peers).get(0).getPeerConfig.getClusterKey)
 
       # cleanup for future tests
       command(:remove_peer, @peer_id)
@@ -100,8 +100,8 @@ module Hbase
       command(:add_peer, @peer_id, args)
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      assert_equal(cluster_key, 
command(:list_peers).fetch(@peer_id).get_cluster_key)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      assert_equal(cluster_key, 
command(:list_peers).get(0).getPeerConfig.getClusterKey)
 
       # cleanup for future tests
       command(:remove_peer, @peer_id)
@@ -114,8 +114,8 @@ module Hbase
       command(:add_peer, @peer_id, args)
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      assert_equal(cluster_key, 
command(:list_peers).fetch(@peer_id).get_cluster_key)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      assert_equal(cluster_key, 
command(:list_peers).get(0).getPeerConfig.getClusterKey)
 
       # cleanup for future tests
       command(:remove_peer, @peer_id)
@@ -130,8 +130,8 @@ module Hbase
       command(:add_peer, @peer_id, args)
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      peer_config = command(:list_peers).fetch(@peer_id)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      peer_config = command(:list_peers).get(0).getPeerConfig
       assert_equal(cluster_key, peer_config.get_cluster_key)
       assert_equal(namespaces_str,
         replication_admin.show_peer_namespaces(peer_config))
@@ -152,8 +152,8 @@ module Hbase
       command(:add_peer, @peer_id, args)
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      peer_config = command(:list_peers).fetch(@peer_id)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      peer_config = command(:list_peers).get(0).getPeerConfig
       assert_equal(cluster_key, peer_config.get_cluster_key)
       assert_equal(namespaces_str,
         replication_admin.show_peer_namespaces(peer_config))
@@ -186,8 +186,8 @@ module Hbase
       command(:add_peer, @peer_id, args)
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      assert_equal(cluster_key, 
command(:list_peers).fetch(@peer_id).get_cluster_key)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      assert_equal(cluster_key, 
command(:list_peers).get(0).getPeerConfig.getClusterKey)
       assert_tablecfs_equal(table_cfs, command(:get_peer_config, 
@peer_id).getTableCFsMap())
 
       # cleanup for future tests
@@ -210,8 +210,8 @@ module Hbase
       command(:add_peer, @peer_id, args)
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      assert_equal(cluster_key, 
command(:list_peers).fetch(@peer_id).get_cluster_key)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      assert_equal(cluster_key, 
command(:list_peers).get(0).getPeerConfig.getClusterKey)
 
       table_cfs = { "table1" => [], "table2" => ["cf1"], "ns3:table3" => 
["cf1", "cf2"] }
       command(:set_peer_tableCFs, @peer_id, table_cfs)
@@ -227,8 +227,8 @@ module Hbase
       command(:add_peer, @peer_id, args)
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      assert_equal(cluster_key, 
command(:list_peers).fetch(@peer_id).get_cluster_key)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      assert_equal(cluster_key, 
command(:list_peers).get(0).getPeerConfig.getClusterKey)
 
       table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] }
       command(:append_peer_tableCFs, @peer_id, table_cfs)
@@ -249,8 +249,8 @@ module Hbase
       command(:add_peer, @peer_id, args)
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      assert_equal(cluster_key, 
command(:list_peers).fetch(@peer_id).get_cluster_key)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      assert_equal(cluster_key, 
command(:list_peers).get(0).getPeerConfig.getClusterKey)
 
       table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] }
       command(:remove_peer_tableCFs, @peer_id, { "ns3:table3" => ["cf1", 
"cf2"] })
@@ -268,15 +268,11 @@ module Hbase
       args = { CLUSTER_KEY => cluster_key }
       command(:add_peer, @peer_id, args)
 
-      # Normally the ReplicationSourceManager will call 
ReplicationPeer#peer_added
-      # but here we have to do it ourselves
-      replication_admin.peer_added(@peer_id)
-
       command(:set_peer_namespaces, @peer_id, namespaces)
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      peer_config = command(:list_peers).fetch(@peer_id)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      peer_config = command(:list_peers).get(0).getPeerConfig
       assert_equal(namespaces_str,
         replication_admin.show_peer_namespaces(peer_config))
 
@@ -292,15 +288,11 @@ module Hbase
       args = { CLUSTER_KEY => cluster_key }
       command(:add_peer, @peer_id, args)
 
-      # Normally the ReplicationSourceManager will call 
ReplicationPeer#peer_added
-      # but here we have to do it ourselves
-      replication_admin.peer_added(@peer_id)
-
       command(:append_peer_namespaces, @peer_id, namespaces)
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      peer_config = command(:list_peers).fetch(@peer_id)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      peer_config = command(:list_peers).get(0).getPeerConfig
       assert_equal(namespaces_str,
         replication_admin.show_peer_namespaces(peer_config))
 
@@ -309,8 +301,8 @@ module Hbase
       command(:append_peer_namespaces, @peer_id, namespaces)
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      peer_config = command(:list_peers).fetch(@peer_id)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      peer_config = command(:list_peers).get(0).getPeerConfig
       assert_equal(namespaces_str,
         replication_admin.show_peer_namespaces(peer_config))
 
@@ -318,8 +310,8 @@ module Hbase
       command(:append_peer_namespaces, @peer_id, namespaces)
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      peer_config = command(:list_peers).fetch(@peer_id)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      peer_config = command(:list_peers).get(0).getPeerConfig
       assert_equal(namespaces_str,
         replication_admin.show_peer_namespaces(peer_config))
 
@@ -334,17 +326,13 @@ module Hbase
       args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces }
       command(:add_peer, @peer_id, args)
 
-      # Normally the ReplicationSourceManager will call 
ReplicationPeer#peer_added
-      # but here we have to do it ourselves
-      replication_admin.peer_added(@peer_id)
-
       namespaces = ["ns1", "ns2"]
       namespaces_str = "ns3"
       command(:remove_peer_namespaces, @peer_id, namespaces)
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      peer_config = command(:list_peers).fetch(@peer_id)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      peer_config = command(:list_peers).get(0).getPeerConfig
       assert_equal(namespaces_str,
         replication_admin.show_peer_namespaces(peer_config))
 
@@ -353,8 +341,8 @@ module Hbase
       command(:remove_peer_namespaces, @peer_id, namespaces)
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      peer_config = command(:list_peers).fetch(@peer_id)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      peer_config = command(:list_peers).get(0).getPeerConfig
       assert_equal(namespaces_str,
         replication_admin.show_peer_namespaces(peer_config))
 
@@ -362,8 +350,8 @@ module Hbase
       command(:remove_peer_namespaces, @peer_id, namespaces)
 
       assert_equal(1, command(:list_peers).length)
-      assert(command(:list_peers).key?(@peer_id))
-      peer_config = command(:list_peers).fetch(@peer_id)
+      assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
+      peer_config = command(:list_peers).get(0).getPeerConfig
       assert_equal(namespaces_str,
         replication_admin.show_peer_namespaces(peer_config))
 
@@ -375,9 +363,6 @@ module Hbase
       cluster_key = "localhost:2181:/hbase-test"
       args = { CLUSTER_KEY => cluster_key }
       command(:add_peer, @peer_id, args)
-      # Normally the ReplicationSourceManager will call 
ReplicationPeer#peer_added
-      # but here we have to do it ourselves
-      replication_admin.peer_added(@peer_id)
 
       peer_config = command(:get_peer_config, @peer_id)
       assert_equal(0, peer_config.get_bandwidth)
@@ -442,9 +427,6 @@ module Hbase
       args = { ENDPOINT_CLASSNAME => repl_impl, CONFIG => config_params, DATA 
=> data_params}
       command(:add_peer, @peer_id, args)
 
-      #Normally the ReplicationSourceManager will call 
ReplicationPeer#peer_added, but here we have to do it ourselves
-      replication_admin.peer_added(@peer_id)
-
       new_config_params = { "config1" => "new_value1" }
       new_data_params = {"data1" => "new_value1"}
       new_args = {CONFIG => new_config_params, DATA => new_data_params}

Reply via email to